美文网首页
BlockingCollection 源码分析(一)

BlockingCollection 源码分析(一)

作者: TakumiWu | 来源:发表于2017-12-12 16:27 被阅读0次

    BlockingCollection是一个线程安全的生产者-消费者集合。

    为线程安全集合提供阻塞和绑定功能,允许多线程安全的添加和删除元素;

    一 属性介绍

    1.获取此 BlockingCollection<T> 实例的限定容量,CheckDisposed方法检查该集合对象是否被释放,被释放会抛出ObjectDisposedException异常

     public int BoundedCapacity
    {
        get
         {
          CheckDisposed();
          return m_boundedCapacity;
         }
    } 
    

    2.Count 返回集合对象内具体的数量 先检查对象是否被释放,然后返回对象总数,m_occupiedNodes是一个SemaphoreSlim对象。

    public int Count
            {
                get
                {
                    CheckDisposed();
                    return m_occupiedNodes.CurrentCount;
                }
            }
    

    3.IsAddingCompleted 获取此 BlockingCollection<T> 是否已标记为已完成添加,m_currentAdder是一个不被编译器优化的int类型参数

     public bool IsAddingCompleted
      {
                get
                {
                    CheckDisposed();
                    return (m_currentAdders == COMPLETE_ADDING_ON_MASK); }}
    

    4.IsCompleted获取一个值,该值指示此 BlockingCollection<T> 是否已标记为已完成添加并且为空。

    public bool IsCompleted
            {
                get
                {
                    CheckDisposed();
                    return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
                }
            }
    

    二 方法
    1.构造函数BlockingCollection 如果初始化的boundedCapacity参数小于1是会抛出参数范围溢出的异常,从资源文件获得错误信息字符串,默认是用ConcurrentQueue做为集合的存储数据的集合,ConcurrentQueue是安全的先进先出 (FIFO) 集合。
    构造函数还允许传进一个实现了IProducerConsumerCollection生产消费者的接口的对象,如果改对象的数值大于BlockingCollection的boundedCapacity会抛出异常,这就要注意下集合的对象和传入的boundedCapacity是否一致。

     public BlockingCollection()
                : this(new ConcurrentQueue<T>())
            {
            }
     public BlockingCollection(int boundedCapacity)
                : this(new ConcurrentQueue<T>(), boundedCapacity)
            {
            }
     public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
            {
                if (boundedCapacity < 1)
                {
                    throw new ArgumentOutOfRangeException(
                        "boundedCapacity", boundedCapacity,
                        SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
                }
                if (collection == null)
                {
                    throw new ArgumentNullException("collection");
                }
                int count = collection.Count;
                if (count > boundedCapacity)
                {
                    throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
                }
                Initialize(collection, boundedCapacity, count);
            }
    public BlockingCollection(IProducerConsumerCollection<T> collection)
            {
                if (collection == null)
                {
                    throw new ArgumentNullException("collection");
                }
                Initialize(collection, NON_BOUNDED, collection.Count);
            }
    private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
            {
                Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
                m_collection = collection;
                m_boundedCapacity = boundedCapacity; ;
                m_isDisposed = false;
                m_ConsumersCancellationTokenSource = new CancellationTokenSource();
                m_ProducersCancellationTokenSource = new CancellationTokenSource();
                if (boundedCapacity == NON_BOUNDED)
                {
                    m_freeNodes = null;
                }
                else
                {
                    Debug.Assert(boundedCapacity > 0);
                    m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
                }
                m_occupiedNodes = new SemaphoreSlim(collectionCount);
            }
    
    1. Add 往集合添加元素方法
      值可以是Null值,如果初始化的此实例时,指定了一个有限的容量,则在有空间可以存储提供的项之前,可能会阻止调用 Add。TryAddWithNoTimeValidation看字面意思是尝试增加没有时间验证的意思,会返回一个增加是否成功的值,第一个参数是项,第二个参数设置超时时间单位是毫秒,取消时间的token对象
     public void Add(T item)
            {
    #if DEBUG
                bool tryAddReturnValue =
    #endif
                TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
    #if DEBUG
                Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
    #endif
            }
    
     private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
            {
               /// 检查集合对象是否被销毁了
                CheckDisposed();
               /// 检查线程取消是不是被取消了
                if (cancellationToken.IsCancellationRequested)
                    throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
                /// 检查是不是集合正在添加
                if (IsAddingCompleted)
                {
                    throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
                }
    
                bool waitForSemaphoreWasSuccessful = true;
                /// 有剩余的位置的节点对象不为空
                if (m_freeNodes != null)
                {
                    //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
                    //was called concurrently with Adding which is not supported by BlockingCollection.
                    CancellationTokenSource linkedTokenSource = null;
                    try
                    {
                        waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
                        if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
                        {
                            linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
                                cancellationToken, m_ProducersCancellationTokenSource.Token);
                            waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //if cancellation was via external token, throw an OCE
                        if (cancellationToken.IsCancellationRequested)
                            throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
    
                        //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
                        //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);
    
                        throw new InvalidOperationException
                            (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
                    }
                    finally
                    {
                        if (linkedTokenSource != null)
                        {
                            linkedTokenSource.Dispose();
                        }
                    }
                }
                if (waitForSemaphoreWasSuccessful)
                {
                    // Update the adders count if the complete adding was not requested, otherwise
                    // spins until all adders finish then throw IOE
                    // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
                    // not been finished yet
                    SpinWait spinner = new SpinWait();
                    while (true)
                    {
                        int observedAdders = m_currentAdders;
                        if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
                        {
                            spinner.Reset();
                            // CompleteAdding is requested, spin then throw
                            while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
                            throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
                        }
                        if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
                        {
                            Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
                            break;
                        }
                        spinner.SpinOnce();
                    }
    
                    // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
                    // 1- m_collection.TryAdd threw an exception
                    // 2- m_collection.TryAdd succeeded
                    // 3- m_collection.TryAdd returned false
                    // so we put the decrement code in the finally block
                    try
                    {
    
                        //TryAdd is guaranteed to find a place to add the element. Its return value depends
                        //on the semantics of the underlying store. Some underlying stores will not add an already
                        //existing item and thus TryAdd returns false indicating that the size of the underlying
                        //store did not increase.
    
    
                        bool addingSucceeded = false;
                        try
                        {
                            //The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
                            //This fixes bug #702328, case 2 of 2.
                            cancellationToken.ThrowIfCancellationRequested();
                            addingSucceeded = m_collection.TryAdd(item);
                        }
                        catch
                        {
                            //TryAdd did not result in increasing the size of the underlying store and hence we need
                            //to increment back the count of the m_freeNodes semaphore.
                            if (m_freeNodes != null)
                            {
                                m_freeNodes.Release();
                            }
                            throw;
                        }
                        if (addingSucceeded)
                        {
                            //After adding an element to the underlying storage, signal to the consumers 
                            //waiting on m_occupiedNodes that there is a new item added ready to be consumed.
                            m_occupiedNodes.Release();
                        }
                        else
                        {
                            throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
                        }
                    }
                    finally
                    {
                        // decrement the adders count
                        Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
                        Interlocked.Decrement(ref m_currentAdders);
                    }
    
    
                }
                return waitForSemaphoreWasSuccessful;
            }
    

    未完待续

    相关文章

      网友评论

          本文标题:BlockingCollection 源码分析(一)

          本文链接:https://www.haomeiwen.com/subject/dbpgixtx.html