美文网首页
Java并发(六):并发容器和框架

Java并发(六):并发容器和框架

作者: Jorvi | 来源:发表于2019-02-20 10:42 被阅读0次

一. ConcurrentHashMap

ConcurrentHashMap是线程安全且高效的HashMap。

  • HashMap线程不安全;
  • HashTable使用synchronized保证线程安全,但是效率非常低下;
  • ConcurrentHashMap使用锁分段技术提升并发访问率。

注意
ConcurrentHashMap的线程安全指的是其内部的方法线程安全(如get、put等),即单独调用某个方法线程安全的。
当代码块包含多个操作时,此代码块并不是线程安全的,因为不能保证整个代码块是互斥执行的,因此仍然需要同步处理(加锁、double check等)。

1. ConcurrentHashMap的结构

JDK1.7版本

  • HashMap由HashEntry数组组成,HashEntry是链表结构。

  • ConcurrentHashMap由Segment数组组成,Segment由HashEntry数组组成,HashEntry是链表结构。

2. ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。

3. 定位Segment

ConcurrentHashMap使用分段锁Segment来保护不同段的数据,在插入和获取元素的时候,必须先通过散列算法定位到Segment。

散列算法的目的:是元素尽可能均匀的分布在不同的Segment上,从而提高容器的存取效率。

4. ConcurrentHashMap的操作

(1) get操作
  1. 对key的hashCode进行一次再散列得到一个新的散列值;
  2. 根据新的散列值,利用散列运算定位到Segment;
  3. 再通过散列算法定位到元素;

整个get过程不需要加锁,除非读到的值为空才会加锁重读。

(2) put操作
  1. 定位到Segment;
  2. 判断是否需要对Segment
(3) size操作

尝试两次不锁住Segment直接统计各Segment大小的方法,如果两次结果不相同,则采用加锁的方式来统计所有Segment的大小。

二. ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用FIFO的规则对节点进行排序。

队列前后分别用head节点和tail节点来定位,中间的节点通过next关联起来。

1. 入队列

入队列就是将入队节点添加到队列的尾部。

  1. 将入队节点设置成当前队列尾节点的下一个节点;
  2. 更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点,如果tail节点的next节点为空,则将入队节点设置为tail节点的next节点。

因此:tail节点并不一定就是尾节点,尾节点可能是tail节点,也可能是tail节点的next节点。

2. 出队列

出队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。

  1. 获取head节点的元素,判断是否为空
  2. 如果为空,表示另一个线程已经进行了一次出队列操作将该节点的元素取走,如果不为空,则使用CAS将head节点设置为null并返回head节点的元素。

三. 阻塞队列

阻塞队列的主要使用场景:生产者和消费者。

四种处理方式:

处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:队列满时,继续插入数据则抛出IllegalStateException;队列空时,从队列取数据则抛出NoSuchElementException。

  • 返回特殊值:插入元素时,成功返回true;移除元素时,如果没有则返回null。

  • 一直阻塞:队列满时,继续插入数据会一直阻塞生产者线程直到队列可用或响应中断退出;队列空时,从队列取数据会阻塞消费者线程直到队列不空。

  • 超时退出:队列满时,继续插入数据会阻塞生产者线程一段时间,超过指定时间则退出。

1. Java里的阻塞队列

(1) ArrayBlockingQueue

用数组实现的有界阻塞队列,FIFO。

(2) LinkedBlockingQueue

用链表实现的有界阻塞队列,默认和最大长度为Integer.MAX_VALUE,FIFO。

(3) PriorityBlockingQueue

支持优先级的无界阻塞队列。

(4) DelayQueue

支持延时获取元素的无界阻塞队列,在创建元素时可指定延时,只有延迟期满才能从队列中提取元素。

(5) SynchronousQueue

不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。

(6) LinkedTransferQueue

由链表结构组成的无界阻塞TransferQueue队列,主要多了tryTransfer和transfer方法。

  • transfer方法:如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,transfer方法将元素存放在队列tail节点,并等到该元素被消费者消费了才返回。

  • tryTransfer方法:如果当前有消费者正在等待接收元素,tryTransfer方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,tryTransfer方法将元素存放在队列tail节点,并返回false。

(7) LinkedBlockingDeque

由链表结构组成的双向阻塞队列,可以从队列的两端插入和移除元素。

2. 阻塞队列的实现原理

使用等待 / 通知模式实现。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private final Condition notEmpty;
    private final Condition notFull;

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
}
  • 当生产者往满的队列中添加元素时会阻塞生产者(await),当消费者消费了一个队列中的元素后,会通知生产者当前队列可用(signal)。

  • 当消费者从空的队列中取元素时会阻塞消费者(await),当生产者添加了一个元素后,会通知消费者当前队列可用(signal)。

四. Fork/Join 框架

Fork/Join 框架是Java 7 提供的一个用于并行执行任务的框架,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果。

  1. 分割任务;
  2. 执行任务并合并结果。

相关文章

网友评论

      本文标题:Java并发(六):并发容器和框架

      本文链接:https://www.haomeiwen.com/subject/btrhjqtx.html