English 中文(简体)
生产者-消费者收集中的脉冲/等待和实施超时执行
原标题:Deterministic Monitor Pulse/ Wait and implementing timeout in producer-consumer collection

我试图同时收集生产者-消费者(多个生产者和消费者),支持消费者超时。

现在,实际的收藏相当复杂( 系统. collections. Concurrent that job down orfully) 。 但是,我这里有一个微小的样本来证明我的问题( 看起来有点像 Blocking Collection<T> ) 。

public sealed class ProducerConsumerQueueDraft<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly object locker = new object();

    public void Enqueue(T item)
    {
        lock (locker)
        {
            queue.Enqueue(item);

            /* This "optimization" is broken, as Nicholas Butler points out.
            if(queue.Count == 1) // Optimization
            */
                Monitor.Pulse(locker); // Notify any waiting consumer threads.
        }
    }

    public T Dequeue(T item)
    {
        lock (locker)
        {
            // Surprisingly, this needs to be a *while* and not an *if*
            // which is the core of my problem.
            while (queue.Count == 0)
                Monitor.Wait(locker);

            return queue.Dequeue();
        }
    }

    // This isn t thread-safe, but is how I want TryDequeue to look.
    public bool TryDequeueDesired(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
            {
                item = default(T);
                return false;
            }

            // This is wrong! The queue may be empty even though we were pulsed!
            item = queue.Dequeue();
            return true;
        }
    }

    // Has nasty timing-gymnastics I want to avoid.
    public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            var watch = Stopwatch.StartNew();
            while (queue.Count == 0)
            {
                var remaining = timeout - watch.Elapsed;

                if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
                {
                    item = default(T);
                    return false;
                }
            }
            item = queue.Dequeue();
            return true;
        }
    }
}

想法是直截了当的:发现等待信号的排队空位的消费者,以及制作人 Pulse (注意:不是 Pulseall ,否则效率不高) 通知等待项目。

我的问题是"的属性 http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse" rel="nofollow"\\ code>monitor.Pulse :

When the thread that invoked Pulse releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.

这意味着,消费者-油道 C1 可能被生产者-油道唤醒来消费一个物品,但“它们”另一个消费者-油道 C2 可以在C1有机会重新获取该物品之前获得锁,然后消费该物品,在获得控制后,C1 留下一个空队列。

这意味着,如果排队确实不是空的,我就必须在每一脉冲的消费者代码中进行防守检查,如果不是这种情况,我必须回去空手等待。

我面临的主要问题是,它效率低下 -- -- 线线可能会被唤醒,以便工作,然后被迅速送回,再等待。 与此相关的一个后果是,执行有超时功能的 < code> TryDequeue 和超时功能是不必要的困难和无效的(见 < code> TryDequeeUeWorks ),当它应该优雅时(见 < code> TryDequeuedesired )。

< 强势>, 我如何扭曲 < code> Monitor. Pulse 来做我想做的事? 或者, 是否有另一个同步原始方法可以做? 是否有比我做的更有效和(或)优雅的方法来实施 < code> TryDequeue 超时?

以我所希望的解决方案来证明问题:

var queue = new ProducerConsumerQueueDraft<int>();

for (int consumer = 0; consumer < 3; consumer++)
    new Thread(() =>
    {
        while (true)
        {
            int item;

            // This call should occasionally throw an exception.
            // Switching to queue.TryDequeueThatWorks should make
            // the problem go away.
            if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
            {
                // Do nothing.
            }
        }

    }).Start();

Thread.Sleep(1000); // Let consumers get up and running

for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
    queue.Enqueue(0);
}
最佳回答

我面临的首要问题是,它效率低下

不是。 您假设这是常见现象, 但这种种族很少发生 。 充其量在蓝月亮中出现一次。 同时循环是必要的, 以确保在“ 蓝月亮” 发生时不会出问题。 它会发生。 不要搞乱它 。

恰恰相反, 锁定设计是“ 有效 < / em > ”, 因为它允许竞赛发生。 并且处理它。 锁定设计修补非常危险, 因为比赛不会经常发生。 它们非常随机, 无法进行充分测试, 证明修改不会导致失败。 添加任何仪器代码也行不通, 改变时间 。

问题回答

以下是一个简单的关键组合式生产者-消费者队列:

public class ConflatingConcurrentQueue<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, Entry> entries;
    private readonly BlockingCollection<Entry> queue;

    public ConflatingConcurrentQueue()
    {
        this.entries = new ConcurrentDictionary<TKey, Entry>();
        this.queue = new BlockingCollection<Entry>();
    }

    public void Enqueue(TValue value, Func<TValue, TKey> keySelector)
    {
        // Get the entry for the key. Create a new one if necessary.
        Entry entry = entries.GetOrAdd(keySelector(value), k => new Entry());

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Replace any old value with the new one.
            entry.Value = value;

            // Add the entry to the queue if it s not enqueued yet.
            if (!entry.Enqueued)
            {
                entry.Enqueued = true;
                queue.Add(entry);
            }
        }
    }

    public bool TryDequeue(out TValue value, TimeSpan timeout)
    {
        Entry entry;

        // Try to dequeue an entry (with timeout).
        if (!queue.TryTake(out entry, timeout))
        {
            value = default(TValue);
            return false;
        }

        // Get exclusive access to the entry.
        lock (entry)
        {
            // Return the value.
            value = entry.Value;

            // Mark the entry as dequeued.
            entry.Enqueued = false;
            entry.Value = default(TValue);
        }

        return true;
    }

    private class Entry
    {
        public TValue Value { get; set; }
        public bool Enqueued { get; set; }
    }
}

(这可能需要一次或两次代码审查, 但我一般认为是正常的。 )





相关问题
Anyone feel like passing it forward?

I m the only developer in my company, and am getting along well as an autodidact, but I know I m missing out on the education one gets from working with and having code reviewed by more senior devs. ...

NSArray s, Primitive types and Boxing Oh My!

I m pretty new to the Objective-C world and I have a long history with .net/C# so naturally I m inclined to use my C# wits. Now here s the question: I feel really inclined to create some type of ...

C# Marshal / Pinvoke CBitmap?

I cannot figure out how to marshal a C++ CBitmap to a C# Bitmap or Image class. My import looks like this: [DllImport(@"test.dll", CharSet = CharSet.Unicode)] public static extern IntPtr ...

How to Use Ghostscript DLL to convert PDF to PDF/A

How to user GhostScript DLL to convert PDF to PDF/A. I know I kind of have to call the exported function of gsdll32.dll whose name is gsapi_init_with_args, but how do i pass the right arguments? BTW, ...

Linqy no matchy

Maybe it s something I m doing wrong. I m just learning Linq because I m bored. And so far so good. I made a little program and it basically just outputs all matches (foreach) into a label control. ...