English 中文(简体)
这是无锁的。 NET queue thread safe?
原标题:Is this lock-free .NET queue thread safe?
  • 时间:2009-10-09 02:16:08
  •  标签:

我的问题是,下面的班级是否包含在一门读物单撰写人栏目中? 这种格言被称为无锁定,即使它会阻碍填满。 https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/5228#530228" 在StackOver的座标上安装一个阻挡装置。

结构的点是,允许一只透镜把数据写到缓冲地带,再读数据。 所有这些都需要尽快实现。

http://www.ddj.com/hpc-high- Performance-computing/210604448?pgno=3”rel=“nofollow noreferer”>article at DDJ by Herb Sutter ,但实施工作在C++中除外。 另一个不同之处是,我使用一个与面包车有联系的清单,我使用一个相关的阵列清单。

我不只包括一刀,而是在任何人认为其有用的情况下,用允许的公开来源许可证(MIT License 1.0)作评论的全文,并且希望加以使用(根据需要或修改)。

这个问题与在Stack Overflow询问的其他问题有关(见,在NET)。 Thread-safe/2005/4ing queue implementation in .NET.

该守则是:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can t place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can t add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}
最佳回答

微软研究CHESS应当证明是检验其实施情况的良好工具。

问题回答

<代码>Sleep()的出现使无锁做法完全无用。 应对无锁设计复杂性的唯一原因是,必须绝对速度并避免Semaphores的费用。 使用睡觉(1)完全违背了这一目的。

鉴于这一点,我找不到任何联系人。 交换的确是读写的,我不会说。 我也质疑,为什么你想要无所作为,因为很少给你带来足够的好处来应对其复杂性。

微软公司在2009年全球钻石交易委员会上做了出色的介绍,你可以取得滑坡here

我怀疑这并非安全——设想以下情景:

2版面进入cursor.Write。 第1条以行文为准:node = 新的Node(x, node);在<代码>的真正半处(目前=BUFFER_SIZE) 说明(但请也假设 现 = BUFFER_SIZE,这样,在<代码>上添加了1条。 接着,如果说的话,另一条路将走到另一条路。 现在可以想象,第1版已经失去时间,第2版已经消失,如果有人错误地认为目前的状况仍然存在,就开始发言。 它本来应该走进另一条道路。

我拿不出这部法律,因此我不敢肯定,如果我可以在这个法典中作出假设,但如果这些假设(即进入曲线)。 <代码>当值=BUFFER_SIZE时,从多个读物书写,很有可能出现不一致的错误。

首先,我想知道这两种顺序代码的假设:

                node.data[current++] = x;

                // We have to use interlocked, to assure that we incremeent the count 
                // atomicalluy, because the reader could be reading it.
                Interlocked.Increment(ref node.count);

究竟是怎样说,数据的新价值是对这一记忆地点的承诺? 它没有储存在挥发的记忆中,因此,如果我正确理解,可以安排时间? 这是否会导致过错? 可能还有其他一些地方也属实,但情况看上去。

第二,包含以下内容的多读法:

Thread.Sleep(int);

......永远不是好的征兆。 如果需要的话,该法典将注定要失败,如果它不要求它产生废物。 我真心希望他们会完全删除这一案文。 实现这一点是至少等待这一时间的要求。 随着环境的中断,你几乎肯定会等待更长时间,时间会更长。

第三,我完全不理解此处使用24小时预报器。 Maybe I m tir tir tir tir and point point point point point point point point point point point point point point point point point point point point point point 看来,我可以找到的唯一用途是修改数据节点的内容,以便确定上面第1号。

最后,执行似乎有些过于重复。 我没有全程/小点,也根本不做同这一类一样的事情? (说明:我没有尝试过,我认为这要么已经安全,要么只是试图挫败我认为你做的事情。)

class ReaderWriterQueue<T>
{
    readonly AutoResetEvent _readComplete;
    readonly T[] _buffer;
    readonly int _maxBuffer;
    int _readerPos, _writerPos;

    public ReaderWriterQueue(int maxBuffer)
    {
        _readComplete = new AutoResetEvent(true);
        _maxBuffer = maxBuffer;
        _buffer = new T[_maxBuffer];
        _readerPos = _writerPos = 0;
    }

    public int Next(int current) { return ++current == _maxBuffer ? 0 : current; }

    public bool Read(ref T item)
    {
        if (_readerPos != _writerPos)
        {
            item = _buffer[_readerPos];
            _readerPos = Next(_readerPos);
            return true;
        }
        else
            return false;
    }

    public void Write(T item)
    {
        int next = Next(_writerPos);

        while (next == _readerPos)
            _readComplete.WaitOne();

        _buffer[next] = item;
        _writerPos = next;
    }
}

因此,我在这里完全没有根据,没有看到原类别中的魔鬼吗?

I must admit one thing, I despise Threading. I ve seen the best developers fail at it. This article gives a great example on how hard it is to get threading right: http://www.yoda.arachsys.com/csharp/singleton.html





相关问题
热门标签