古老而又笨拙的并发容器:hashtable和vector
hashtable和vector是两个比较古老的线程安全的容器,hashtable是Map类型的容器,vector是list类型的容器,由于两个类锁的粒度太大,多个方法之间共用一把锁,所以并发性能差,现在已经逐渐遭到了弃用,所以建议兄弟萌不要在使用他们两个了
ArrayList和HashMap也能线程安全:
在大家的认知中ArrayList和HashMap都线程不安全的两个类,但是使了Collections.synchronizedList(list)和Collections.synchronizedMap(map)就立马线程安全了
点到synchronizedList看到里面的源码,返回的是SynchronizedRandomAccessList,SynchronizedRandomAccessList的父类是SynchronizedList
public static <T> List<T> synchronizedList(List<T> list) {
//集合list是属于RandomAccess的话就返回 new SynchronizedRandomAccessList<>(list)
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
点开 SynchronizedRandomAccessList<>(list) ,然后点开真正执行方法的父类,其实只是把list的每一个操作都加了synchronized对象锁,锁的对象是一样的,锁的粒度没hashtable和vector那么大,但是都是差不多的
static class SynchronizedList<E>
extends SynchronizedCollection<E>
implements List<E> {
private static final long serialVersionUID = -7754090372962971524L;
final List<E> list;
SynchronizedList(List<E> list) {
super(list);
this.list = list;
}
SynchronizedList(List<E> list, Object mutex) {
super(list, mutex);
this.list = list;
}
public boolean equals(Object o) {
if (this == o)
return true;
synchronized (mutex) {return list.equals(o);}
}
public int hashCode() {
synchronized (mutex) {return list.hashCode();}
}
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
//............................省略
public List<E> subList(int fromIndex, int toIndex) {
synchronized (mutex) {
return new SynchronizedList<>(list.subList(fromIndex, toIndex),
mutex);
}
}
//............................省略
Map和list一样,只是对其内部的方法被调用时,使用synchronized进行同步
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
//............................省略
由此我们知道Collections.synchronized包装的集合只是在调用方法时使用synchronized进行同步,Collections.synchronized性能依旧和hashtable和vector一样差,一个用synchronized 方法锁,一个用synchronized 对象锁,其实几乎是一样的,没有什么区别,只是Collections.synchronized锁的粒度稍微稍微的好了那么一点点点点
大名鼎鼎的并发容器ConcurrentHashMap
Collections.synchronized和Hashtable效率低下,不适合高并发的生产环境使用,hashMap线程不安全,在高并发下造成数据不一致以及由于并发场景造成的容器同时扩容导致内部链表指向错乱发生链表内相互指向而形成了循环链表造成cup100%的问题,所以使用ConcurrentHashMap可以很好的解决高并发场景下的需求
要提升锁的使用效率有两种方法:1、锁的粒度应当尽可能小 2、将整锁拆分成多个锁进行优化
jdk1.7的ConcurrentHashMap
在jdk1.7中,ConcurrentHashMap是以一个个名为segMent的类将一个个hasMap的结构链住,每一segMent都持有一个不同的锁,这里逻辑上将数组拆分成了多个子数组,每个子数组分配一个segMent,也就是说子数组之间是互不影响的,默认最多16个segMent也就是最多拥有16把锁,并且这个segMent确定后是无法进行扩容的,我们可以设置segMent的值,每个segMent都有一个独立的锁,所以每一segMent可以同时进行,多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术![](https://img.haomeiwen.com/i22844171/3da182d8f5bd2bf5.png)
jdk1.8的ConcurrentHashMap
jdk1.8中摈弃了锁分段,采用了Node数组+链表+红黑树+CAS+synchronized,Synchronized只锁链表和红黑树的首节点,只要不发生hash冲突就不进行加锁,而是采用无锁的CAS进行尝试操作,进一步提升性能
这个集合的扩容机制和hasmap差不多是一样的,但是key和value不允许为空,先是判断数组的同一个位置的链表是否达到了8,达到了8还会继续判断哈希桶是否大于64,只有这些条件都满足了才转化为红黑树,如果哈希桶小于64的话就会进行扩容,每次给哈希桶扩容至2倍,由于计算哈希桶的索引值是 (n - 1) &hash,所以每次扩容都会调用并触发transfer方法,重新调整节点的位置,在table中删除元素时,如果元素所在的红黑树节点个数小于6,则会触发红黑树向链表结构转换
https://www.cnblogs.com/zerotomax/p/8687425.html
先看几个重要的参数
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
/**
* 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75
* 当为负的时候,说明表正在初始化或扩张,
* -1表示初始化
* -(1+n) n:表示活动的扩张线程
*/
private transient volatile int sizeCtl;
/*
* 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,
* 如果没有的话就初始化数组
* 然后通过计算hash值来确定放在数组的哪个位置
* 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来
* 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制
* 最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作
* 然后判断当前取出的节点位置存放的是链表还是树
* 如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话,
* 则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾
* 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去
* 最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,
* 则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();//K,V都不能为空,否则的话跑出异常
int hash = spread(key.hashCode()); //取得key的hash值
int binCount = 0; //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
for (Node<K,V>[] tab = table;;) { //使用不断的循环,调用CAS进行更新
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //第一次put的时候table没有初始化,则初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会出现数组越界
if (casTabAt(tab, i, null, //如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的
new Node<K,V>(hash, key, value, null))) //创建一个Node添加到数组中区,null表示的是下一个节点为空
break; //这里如果cas添加成功了就会跳出循环
}
/*
* 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
* 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
/*
* 如果在这个位置有元素的话,就采用synchronized的方式加锁,
* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,
* 如果找到了key和key的hash值都一样的节点,则把它的值替换到
* 如果没找到的话,则添加在链表的最后面
* 否则,是树的话,则调用putTreeVal方法添加到树中去
*
* 在添加完之后,会对该节点上关联的的数目进行判断,
* 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容
*/
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { //再次取出要存储的位置的元素,跟前面取出来的比较
if (fh >= 0) { //取出来的元素的hash值大于0,当转换为树之后,hash值为-2
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { //遍历这个链表
K ek;
if (e.hash == hash && //要存的元素的hash,key跟要存储的位置的节点的相同的时候,替换掉该节点的value即可
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) //当使用putIfAbsent的时候,只有在这个key没有设置值得时候才设置
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //如果不是同样的hash,同样的key的时候,则判断该节点的下一个节点是否为空,
pred.next = new Node<K,V>(hash, key, //为空的话把这个要加入的节点设置为当前节点的下一个节点
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //表示已经转化成红黑树类型了
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, //调用putTreeVal方法,将该元素添加到树中去
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为tree
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //计数
return null;
}
在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。
/*
* 用来返回节点数组的指定位置的节点的原子操作
*/
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/*
* cas原子操作,在指定位置设定值
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/*
* 原子操作,在指定位置设定值
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
/**
* 初始化数组table,
* 如果sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
* 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
* 否则的话初始化一个默认大小(16)的数组
* 然后设置sizeCtl的值为数组长度的3/4
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { //第一次put的时候,table还没被初始化,进入while
if ((sc = sizeCtl) < 0) //sizeCtl初始值为0,当小于0的时候表示在别的线程在初始化表或扩展表
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //指定了大小的时候就创建指定大小的Node数组,否则创建指定大小(16)的Node数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc; //初始化后,sizeCtl长度为数组长度的3/4
}
break;
}
}
return tab;
}
在put方法的详解中,我们可以看到,在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树
同时在每次添加完元素的addCount方法中,也会判断当前数组中的元素是否达到了sizeCtl的量,如果达到了的话,则会进入transfer方法去扩容
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
* 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
System.out.println("treeifyBin方\t==>数组长:"+tab.length);
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64
tryPresize(n << 1); // 数组扩容
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { //使用synchronized同步器,将该节点出的链表转为树
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null; //hd:树的头(head)
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null) //把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置
hd = p; //设置head
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的链表放入容器TreeBin中
}
}
}
}
}
/**
* 扩容表为指可以容纳指定个数的大小(总是2的N次方)
* 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12
* 计算出来c的值为64,则要扩容到sizeCtl≥为止
* 第一次扩容之后 数组长:32 sizeCtl:24
* 第二次扩容之后 数组长:64 sizeCtl:48
* 第二次扩容之后 数组长:128 sizeCtl:94 --> 这个时候才会退出扩容
*/
private final void tryPresize(int size) {
/*
* MAXIMUM_CAPACITY = 1 << 30
* 如果给定的大小大于等于数组容量的一半,则直接使用最大容量,
* 否则使用tableSizeFor算出来
* 后面table一直要扩容到这个值小于等于sizeCtrl(数组长度的3/4)才退出扩容
*/
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// printTable(tab); 调试用的
/*
* 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚算出来的c中较大的一个大小的数组
* 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4
* 为什么要在扩张的地方来初始化数组呢?这是因为如果第一次put的时候不是put单个元素,
* 而是调用putAll方法直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,
* 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断
*/
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //初始化tab的时候,把sizeCtl设为-1
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
/*
* 一直扩容到的c小于等于sizeCtl或者数组长度大于最大长度的时候,则退出
* 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍
*/
else if (c <= sc || n >= MAXIMUM_CAPACITY) {
break; //退出扩张
}
else if (tab == table) {
int rs = resizeStamp(n);
/*
* 如果正在扩容Table的话,则帮助扩容
* 否则的话,开始新的扩容
* 在transfer操作,将第一个参数的table中的元素,移动到第二个元素的table中去,
* 虽然此时第二个参数设置的是null,但是,在transfer方法中,当第二个参数为null的时候,
* 会创建一个两倍大小的table
*/
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
/*
* transfer的线程数加一,该线程将进行transfer的帮忙
* 在transfer的时候,sc表示在transfer工作的线程数
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
/*
* 没有在初始化或扩容,则开始扩容
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) {
transfer(tab, null);
}
}
}
}
![](https://img.haomeiwen.com/i22844171/e0f680ac6cf4ea01.png)
![](https://img.haomeiwen.com/i22844171/5667c0af72c9134b.png)
并发的list容器CopyOnWriteArrayList
在一般的读写锁中是不予许共存的,写锁是互斥锁不与任何锁共享,所以在普通的集合进行迭代遍历时是不允许删除或者添加元素的,而CopyOnWriteArrayList不同,它可以做到读写共享(但是写与写还是互斥的),主要由于CopyOnWriteArrayList 类的所有可变操作(add,set等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,并不直接修改原有数组对象,而是对原有数据进行一次拷贝,将修改的内容写入副本中。写完之后,再将修改完的副本替换成原来的数据,这样就可以保证写操作不会影响读操作了,这个过程就是对一块内存进行修改时,不直接在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来指向的内存指针指到新的内存,原来的内存就可以被回收。但是由于CopyOnWriteArrayList修改时是进行的新副本的操作,所以数据的一致性差,由于建立了新的副本所以CopyOnWriteArrayList的会占用比较多内存
源码解读
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//增加操作是加锁的
lock.lock();
try {
Object[] elements = getArray();
//获取当前数组的长度
int len = elements.length;
//复制一个与原来数组相同的新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
//将元素插入数组中
newElements[len] = e;
//将新数组的内存位置,赋值给久数组引用对象的指向地址
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
删除操作
public E remove(int index) {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
//开辟新副本
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
//新的数组覆盖旧的数组
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
我们知道Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况。
CopyOnWriteArraySet
CopyOnWriteArraySet是基于CopyOnWriteArrayList 的,其装载数据的对象就是CopyOnWriteArrayList ,但问题是CopyOnWriteArrayList允许有重复元素,但CopyOnWriteArraySet作为一个HashSet却不能有重复元素。
为了解决这一问题,CopyOnWriteArrayList专门提供了addIfAbsent和addAllAbsent,以防止添加元素时会添加重复元素到里面去。
CopyOnWriteArraySet的空参构造函数
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private static final long serialVersionUID = 5457747651344034263L;
private final CopyOnWriteArrayList<E> al;
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
add方法是调用CopyOnWriteArrayList 的addIfAbsent方法,adsent是缺席的意思,顾名思义这个类就是只有这个元素不存在于set中时,才会加入该元素e,从而防止重复元素加入。
public boolean add(E e) {
return al.addIfAbsent(e);
}
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
阻塞队列BlockingQueue
BlockingQueue是个接口,主要定义了当一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒,它的主要的几个子类如下:
1、SynchronousQuene
不缓存任务的阻塞队列,SynchronousQuene内部没有容量,当任务被提交进来后就会要求线程马上处理,不会去缓存新进来的任务,如果没有足够的核心线程处理,则会创建新线程去处理,当到达maxPooolSize时就执行拒绝策略,SynchronousQuene适用于任务数不多的场景,如果使用了SynchronousQuene应当把maxPooolSize设置得很大,这样才会避免队列执行拒绝策略
2、LinkedBlockingQuene
无界阻塞队列,这个队列是基于链表的队列,按照FIFO排序,使用这个队列时由于队列是无界的,所以无论有多少任务都不会排满队列容量,因此线程池不管核心线程是否空闲,maxPooolSize有多大都不会创建任何非核心线程,此队列适用于任务数量非常大的场景,但是风险就在于如果线程的处理速度跟不上任务的生产速度,任务缓存数量过多造成OOM,所以使用此队列尽量将核心线程数设置的大些
3、ArrayBlockingQueue
有界阻塞队列,这个队列是基于数组的队列,按照FIFO排序,这个队列的容量是可以设置的,当任务数排满队列容量时线程池就会创建新的线程去处理任务,当队列排满,线程池的线程数量也到达maxPooolSize后则会执行拒绝策略
4、PriorityBlockingQueue
具有优先级的无界阻塞队列,队列可以按照优先级进行内部元素排序,队列的任务需要实现Comparable 接口,才能通过使用compareTo()方法进行排序。
5、DelayQueue
是一个具有延迟时效的无界队列,只有在延迟期满后才能从队列中取出任务
![](https://img.haomeiwen.com/i22844171/b43b39e4983bdb9d.png)
常用API:
put(E e):向队尾存入元素,如果队列满,则等待;
take():从队首取元素,如果队列为空,则等待;
offer(E e,long timeout, TimeUnit unit):向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll(long timeout, TimeUnit unit):从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取不到,则返回null;否则返回取得的元素;
阻塞队列是一个具有阻塞功能的队列,阻塞队列是线程安全的,take方法取数据时当队列中无数据时会发生阻塞直到队列中有数据,put方法存数据时队列中数据满了就无法再插入数据而发生阻塞直到队列中有空闲。BlockingQueue常用作于一端给生产者生产(put)数据,一端给消费者消费(take)数据
-
1、LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。 LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。 -
2、ArrayBlockingQueue
ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。
代码演示:
public class BlockingQueueTest {
private int i;
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
class Producer implements Runnable {
@Override
public void run() {
while (true) {
try {
System.out.println("-----------------------------------------------------");
Thread.sleep(100);
i++;
queue.put(i);
System.out.println("生产的数据:"+i);
System.out.println("队列内的数据:" + queue);
System.out.println("生产后剩余容量:" + queue.remainingCapacity());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
System.out.println("-----------------------------------------------------");
Thread.sleep(200);
Integer take = queue.take();
System.out.println("消费的数据:" + take);
System.out.println("消费后队列内的数据:" + queue);
System.out.println("消费后队列剩余容量:" + queue.remainingCapacity());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BlockingQueueTest linkedBlockingQueueTest = new BlockingQueueTest();
new Thread(linkedBlockingQueueTest.new Producer()).start();
new Thread(linkedBlockingQueueTest.new Consumer()).start();
}
}
![](https://img.haomeiwen.com/i22844171/4c64a3b472b088e2.png)
由成员变量,我们可以看到阻塞队列使用的ReentrantLock,阻塞是使用的Condition
//容量,如果没有指定,该值为Integer.MAX_VALUE;
private final int capacity;
//当前队列中的元素
private final AtomicInteger count =new AtomicInteger();
//队列头节点,始终满足head.item==null
transient Node head;
//队列的尾节点,始终满足last.next==null
private transient Node last;
//用于出队的锁
private final ReentrantLock takeLock =new ReentrantLock();
//当队列为空时,保存执行出队的线程
private final Condition notEmpty = takeLock.newCondition();
//用于入队的锁
private final ReentrantLock putLock =new ReentrantLock();
//当队列满时,保存执行入队的线程
private final Condition notFull = putLock.newCondition();
简单看一个LinkedBlockingQueue的put方法
public void put(E e)throws InterruptedException {
//不允许元素为null
if (e ==null)
throw new NullPointerException();
int c = -1;
//以当前元素新建一个节点
Node node =new Node(e);
final ReentrantLock putLock =this.putLock;
final AtomicInteger count =this.count;
//获得入队的锁
putLock.lockInterruptibly();
try {
//如果队列已满,那么将该线程加入到Condition的等待队列中
while (count.get() == capacity) {
notFull.await();
}
//将节点入队
enqueue(node);
//得到插入之前队列的元素个数
c = count.getAndIncrement();
//如果还可以插入元素,那么释放等待的入队线程
if (c +1 < capacity){
notFull.signal();
}
}finally {
//解锁
putLock.unlock();
}
//通知出队线程队列非空
if (c ==0)
signalNotEmpty();
}
take()方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//获取takeLock锁
takeLock.lockInterruptibly();
try {
//如果队列为空,那么加入到notEmpty条件的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//得到队头元素
x = dequeue();
//得到取走一个元素之前队列的元素个数
c = count.getAndDecrement();
//如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果队列中的元素从满到非满,通知put线程
if (c == capacity)
signalNotFull();
return x;
}
网友评论