BlockingCollection是一个线程安全的生产者-消费者集合。
为线程安全集合提供阻塞和绑定功能,允许多线程安全的添加和删除元素;
一 属性介绍
1.获取此 BlockingCollection<T> 实例的限定容量,CheckDisposed方法检查该集合对象是否被释放,被释放会抛出ObjectDisposedException异常
public int BoundedCapacity
{
get
{
CheckDisposed();
return m_boundedCapacity;
}
}
2.Count 返回集合对象内具体的数量 先检查对象是否被释放,然后返回对象总数,m_occupiedNodes是一个SemaphoreSlim对象。
public int Count
{
get
{
CheckDisposed();
return m_occupiedNodes.CurrentCount;
}
}
3.IsAddingCompleted 获取此 BlockingCollection<T> 是否已标记为已完成添加,m_currentAdder是一个不被编译器优化的int类型参数
public bool IsAddingCompleted
{
get
{
CheckDisposed();
return (m_currentAdders == COMPLETE_ADDING_ON_MASK); }}
4.IsCompleted获取一个值,该值指示此 BlockingCollection<T> 是否已标记为已完成添加并且为空。
public bool IsCompleted
{
get
{
CheckDisposed();
return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
}
}
二 方法
1.构造函数BlockingCollection 如果初始化的boundedCapacity参数小于1是会抛出参数范围溢出的异常,从资源文件获得错误信息字符串,默认是用ConcurrentQueue做为集合的存储数据的集合,ConcurrentQueue是安全的先进先出 (FIFO) 集合。
构造函数还允许传进一个实现了IProducerConsumerCollection生产消费者的接口的对象,如果改对象的数值大于BlockingCollection的boundedCapacity会抛出异常,这就要注意下集合的对象和传入的boundedCapacity是否一致。
public BlockingCollection()
: this(new ConcurrentQueue<T>())
{
}
public BlockingCollection(int boundedCapacity)
: this(new ConcurrentQueue<T>(), boundedCapacity)
{
}
public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
{
if (boundedCapacity < 1)
{
throw new ArgumentOutOfRangeException(
"boundedCapacity", boundedCapacity,
SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
}
if (collection == null)
{
throw new ArgumentNullException("collection");
}
int count = collection.Count;
if (count > boundedCapacity)
{
throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
}
Initialize(collection, boundedCapacity, count);
}
public BlockingCollection(IProducerConsumerCollection<T> collection)
{
if (collection == null)
{
throw new ArgumentNullException("collection");
}
Initialize(collection, NON_BOUNDED, collection.Count);
}
private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
{
Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
m_collection = collection;
m_boundedCapacity = boundedCapacity; ;
m_isDisposed = false;
m_ConsumersCancellationTokenSource = new CancellationTokenSource();
m_ProducersCancellationTokenSource = new CancellationTokenSource();
if (boundedCapacity == NON_BOUNDED)
{
m_freeNodes = null;
}
else
{
Debug.Assert(boundedCapacity > 0);
m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
}
m_occupiedNodes = new SemaphoreSlim(collectionCount);
}
- Add 往集合添加元素方法
值可以是Null值,如果初始化的此实例时,指定了一个有限的容量,则在有空间可以存储提供的项之前,可能会阻止调用 Add。TryAddWithNoTimeValidation看字面意思是尝试增加没有时间验证的意思,会返回一个增加是否成功的值,第一个参数是项,第二个参数设置超时时间单位是毫秒,取消时间的token对象
public void Add(T item)
{
#if DEBUG
bool tryAddReturnValue =
#endif
TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
#if DEBUG
Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
}
private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
/// 检查集合对象是否被销毁了
CheckDisposed();
/// 检查线程取消是不是被取消了
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
/// 检查是不是集合正在添加
if (IsAddingCompleted)
{
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
}
bool waitForSemaphoreWasSuccessful = true;
/// 有剩余的位置的节点对象不为空
if (m_freeNodes != null)
{
//If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
//was called concurrently with Adding which is not supported by BlockingCollection.
CancellationTokenSource linkedTokenSource = null;
try
{
waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
{
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken, m_ProducersCancellationTokenSource.Token);
waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
}
}
catch (OperationCanceledException)
{
//if cancellation was via external token, throw an OCE
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
//if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
//Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);
throw new InvalidOperationException
(SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
}
finally
{
if (linkedTokenSource != null)
{
linkedTokenSource.Dispose();
}
}
}
if (waitForSemaphoreWasSuccessful)
{
// Update the adders count if the complete adding was not requested, otherwise
// spins until all adders finish then throw IOE
// The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
// not been finished yet
SpinWait spinner = new SpinWait();
while (true)
{
int observedAdders = m_currentAdders;
if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
{
spinner.Reset();
// CompleteAdding is requested, spin then throw
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
}
if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
{
Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
break;
}
spinner.SpinOnce();
}
// This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
// 1- m_collection.TryAdd threw an exception
// 2- m_collection.TryAdd succeeded
// 3- m_collection.TryAdd returned false
// so we put the decrement code in the finally block
try
{
//TryAdd is guaranteed to find a place to add the element. Its return value depends
//on the semantics of the underlying store. Some underlying stores will not add an already
//existing item and thus TryAdd returns false indicating that the size of the underlying
//store did not increase.
bool addingSucceeded = false;
try
{
//The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
//This fixes bug #702328, case 2 of 2.
cancellationToken.ThrowIfCancellationRequested();
addingSucceeded = m_collection.TryAdd(item);
}
catch
{
//TryAdd did not result in increasing the size of the underlying store and hence we need
//to increment back the count of the m_freeNodes semaphore.
if (m_freeNodes != null)
{
m_freeNodes.Release();
}
throw;
}
if (addingSucceeded)
{
//After adding an element to the underlying storage, signal to the consumers
//waiting on m_occupiedNodes that there is a new item added ready to be consumed.
m_occupiedNodes.Release();
}
else
{
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
}
}
finally
{
// decrement the adders count
Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
Interlocked.Decrement(ref m_currentAdders);
}
}
return waitForSemaphoreWasSuccessful;
}
未完待续
网友评论