我试图同时收集生产者-消费者(多个生产者和消费者),支持消费者超时。
现在,实际的收藏相当复杂( 系统. collections. Concurrent that job down orfully) 。 但是,我这里有一个微小的样本来证明我的问题( 看起来有点像 Blocking Collection<T>
) 。
public sealed class ProducerConsumerQueueDraft<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object locker = new object();
public void Enqueue(T item)
{
lock (locker)
{
queue.Enqueue(item);
/* This "optimization" is broken, as Nicholas Butler points out.
if(queue.Count == 1) // Optimization
*/
Monitor.Pulse(locker); // Notify any waiting consumer threads.
}
}
public T Dequeue(T item)
{
lock (locker)
{
// Surprisingly, this needs to be a *while* and not an *if*
// which is the core of my problem.
while (queue.Count == 0)
Monitor.Wait(locker);
return queue.Dequeue();
}
}
// This isn t thread-safe, but is how I want TryDequeue to look.
public bool TryDequeueDesired(out T item, TimeSpan timeout)
{
lock (locker)
{
if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
{
item = default(T);
return false;
}
// This is wrong! The queue may be empty even though we were pulsed!
item = queue.Dequeue();
return true;
}
}
// Has nasty timing-gymnastics I want to avoid.
public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
{
lock (locker)
{
var watch = Stopwatch.StartNew();
while (queue.Count == 0)
{
var remaining = timeout - watch.Elapsed;
if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
{
item = default(T);
return false;
}
}
item = queue.Dequeue();
return true;
}
}
}
想法是直截了当的:发现等待信号的排队空位的消费者,以及制作人 Pulse
(注意:不是 Pulseall
,否则效率不高) 通知等待项目。
When the thread that invoked Pulse releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.
这意味着,消费者-油道 C1 可能被生产者-油道唤醒来消费一个物品,但“它们”另一个消费者-油道 C2 可以在C1有机会重新获取该物品之前获得锁,然后消费该物品,在获得控制后,C1 留下一个空队列。
这意味着,如果排队确实不是空的,我就必须在每一脉冲的消费者代码中进行防守检查,如果不是这种情况,我必须回去空手等待。
我面临的主要问题是,它效率低下 -- -- 线线可能会被唤醒,以便工作,然后被迅速送回,再等待。 与此相关的一个后果是,执行有超时功能的 < code> TryDequeue 和超时功能是不必要的困难和无效的(见 < code> TryDequeeUeWorks ),当它应该优雅时(见 < code> TryDequeuedesired )。
< 强势>, 我如何扭曲 < code> Monitor. Pulse code > 来做我想做的事? 或者, 是否有另一个同步原始方法可以做? 是否有比我做的更有效和(或)优雅的方法来实施 < code> TryDequeue 超时? 强
以我所希望的解决方案来证明问题:
var queue = new ProducerConsumerQueueDraft<int>();
for (int consumer = 0; consumer < 3; consumer++)
new Thread(() =>
{
while (true)
{
int item;
// This call should occasionally throw an exception.
// Switching to queue.TryDequeueThatWorks should make
// the problem go away.
if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
{
// Do nothing.
}
}
}).Start();
Thread.Sleep(1000); // Let consumers get up and running
for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
queue.Enqueue(0);
}