一. ConcurrentHashMap
ConcurrentHashMap是线程安全且高效的HashMap。
- HashMap线程不安全;
- HashTable使用synchronized保证线程安全,但是效率非常低下;
- ConcurrentHashMap使用锁分段技术提升并发访问率。
注意:
ConcurrentHashMap的线程安全指的是其内部的方法线程安全(如get、put等),即单独调用某个方法线程安全的。
当代码块包含多个操作时,此代码块并不是线程安全的,因为不能保证整个代码块是互斥执行的,因此仍然需要同步处理(加锁、double check等)。
1. ConcurrentHashMap的结构
JDK1.7版本
-
HashMap由HashEntry数组组成,HashEntry是链表结构。
-
ConcurrentHashMap由Segment数组组成,Segment由HashEntry数组组成,HashEntry是链表结构。
2. ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。
3. 定位Segment
ConcurrentHashMap使用分段锁Segment来保护不同段的数据,在插入和获取元素的时候,必须先通过散列算法定位到Segment。
散列算法的目的:是元素尽可能均匀的分布在不同的Segment上,从而提高容器的存取效率。
4. ConcurrentHashMap的操作
(1) get操作
- 对key的hashCode进行一次再散列得到一个新的散列值;
- 根据新的散列值,利用散列运算定位到Segment;
- 再通过散列算法定位到元素;
整个get过程不需要加锁,除非读到的值为空才会加锁重读。
(2) put操作
- 定位到Segment;
- 判断是否需要对Segment
(3) size操作
尝试两次不锁住Segment直接统计各Segment大小的方法,如果两次结果不相同,则采用加锁的方式来统计所有Segment的大小。
二. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用FIFO的规则对节点进行排序。
队列前后分别用head节点和tail节点来定位,中间的节点通过next关联起来。
1. 入队列
入队列就是将入队节点添加到队列的尾部。
- 将入队节点设置成当前队列尾节点的下一个节点;
- 更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点,如果tail节点的next节点为空,则将入队节点设置为tail节点的next节点。
因此:tail节点并不一定就是尾节点,尾节点可能是tail节点,也可能是tail节点的next节点。
2. 出队列
出队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。
- 获取head节点的元素,判断是否为空
- 如果为空,表示另一个线程已经进行了一次出队列操作将该节点的元素取走,如果不为空,则使用CAS将head节点设置为null并返回head节点的元素。
三. 阻塞队列
阻塞队列的主要使用场景:生产者和消费者。
四种处理方式:
处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
-
抛出异常:队列满时,继续插入数据则抛出IllegalStateException;队列空时,从队列取数据则抛出NoSuchElementException。
-
返回特殊值:插入元素时,成功返回true;移除元素时,如果没有则返回null。
-
一直阻塞:队列满时,继续插入数据会一直阻塞生产者线程直到队列可用或响应中断退出;队列空时,从队列取数据会阻塞消费者线程直到队列不空。
-
超时退出:队列满时,继续插入数据会阻塞生产者线程一段时间,超过指定时间则退出。
1. Java里的阻塞队列
(1) ArrayBlockingQueue
用数组实现的有界阻塞队列,FIFO。
(2) LinkedBlockingQueue
用链表实现的有界阻塞队列,默认和最大长度为Integer.MAX_VALUE,FIFO。
(3) PriorityBlockingQueue
支持优先级的无界阻塞队列。
(4) DelayQueue
支持延时获取元素的无界阻塞队列,在创建元素时可指定延时,只有延迟期满才能从队列中提取元素。
(5) SynchronousQueue
不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。
(6) LinkedTransferQueue
由链表结构组成的无界阻塞TransferQueue队列,主要多了tryTransfer和transfer方法。
-
transfer方法:如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,transfer方法将元素存放在队列tail节点,并等到该元素被消费者消费了才返回。
-
tryTransfer方法:如果当前有消费者正在等待接收元素,tryTransfer方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,tryTransfer方法将元素存放在队列tail节点,并返回false。
(7) LinkedBlockingDeque
由链表结构组成的双向阻塞队列,可以从队列的两端插入和移除元素。
2. 阻塞队列的实现原理
使用等待 / 通知模式实现。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final Condition notEmpty;
private final Condition notFull;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
}
-
当生产者往满的队列中添加元素时会阻塞生产者(await),当消费者消费了一个队列中的元素后,会通知生产者当前队列可用(signal)。
-
当消费者从空的队列中取元素时会阻塞消费者(await),当生产者添加了一个元素后,会通知消费者当前队列可用(signal)。
四. Fork/Join 框架
Fork/Join 框架是Java 7 提供的一个用于并行执行任务的框架,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果。
- 分割任务;
- 执行任务并合并结果。
网友评论