Java并发容器

作者: 小智pikapika | 来源:发表于2019-03-03 18:05 被阅读0次

    接续上篇Java线程安全,这次来撸一撸Java中并发容器的源码。

    ConcurrentHashMap&HashTable&HashMap

    这个应该是面试中比较经典的一个问题了,三者的区别主要如下:
    HashMap:非线程安全,在多线程环境下可能出现数据丢失的情况
    HashTable:线程安全,但是实现方法只是在方法上加synchronized关键字,结合HashTable的数据结构来看,底层是由Node链表数组来实现的,当两个key hash值不一样时,会放在数组的不同位置,而简单的加synchronized关键字会阻塞其他所有的put操作,性能较差
    ConcurrentHashMap:很好的避免了HashTable的缺点,put操作只会对数组特定位置的Node链表加锁,不会影响其他位置的操作,性能大大提高

    class ConcurrentHashMap:
    /** Implementation for put and putIfAbsent */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    //如果当前位置Node为空,通过cas操作设置
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    //锁定当前位置Node链表
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    

    原子操作类

    核心原理都是通过volatile+CAS保证操作的线程安全,主要还是得理解上篇文章中线程安全问题的原因,volatile保证了代码里面的value(线程工作内存中的值)与主内存值的一致性,通过CAS的原子性操作比较并修改主存中的值。这句话可能有点绕,举个例子:
    主存中value值为1
    线程A、B同时读取到各自的工作内存value值为1
    线程A、B同时通过CAS(1,2)指令想要设为2,由于CAS指令的原子性,假设A线程成功,则线程A和主存的值均变为2,这时才开始执行B的CAS(1,2)指令发现值已经变为2,线程2失败
    然后线程B想要再次进行CAS操作时,由于volatile的可见性,必定会从主存重新读取value值为2,再次通过CAS(2,2)指令去修改就能够成功了

    public class AtomicInteger:
        // setup to use Unsafe.compareAndSwapInt for updates
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long valueOffset;
    
        static {
            try {
                valueOffset = unsafe.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        private volatile int value;
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    

    BlockingQueue家族

    Java中的阻塞队列实现原理都是通过上篇文章中提到的ReentrantLock来实现的,所有操作方法都必须先获取内部的ReentrantLock才能继续,否则返回false/阻塞/抛出异常,常用的阻塞队列有以下几个:
    ArrayBlockingQueue:由数组实现的有界阻塞队列
    LinkedBlockingQueue:由链表实现的有界阻塞队列
    PriorityBlockingQueue:由数组实现的支持优先级的无界阻塞队列
    SynchronousBlockingQueue:不储存元素的阻塞队列,所有的入队列操作都将阻塞,直到被出队列唤醒,反之亦然,newCachedThreadPool中的阻塞队列就是这个
    DelayBlockingQueue:基于PriorityQueue实现的延时阻塞队列
    下面就以ArrayBlockingQueue为例摸一遍源码吧,其他也都差不多的套路:

        /** Number of elements in the queue */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
    
        /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }
    

    核心方法是enqueue、dequeue,因为是private并不包含任何同步和长度判断,只是简单的在数组中插入和删除元素罢了,真正的同步实在对外暴露的put、take等方法,首先获取Lock,同时会判断长度问题决定是否需要通过Condition等待队列非空/非满。
    关于BlockingQueue可以思考以下细节问题:
    队列长度问题:ArrayBlockingQueue通过count来记录长度,为什么不需要加volatile呢?上篇文章有讲到AQS中已经通过volatile来避免state的可见性问题,BlockingQueue中获取count之前已经获取锁,肯定不会有可见性问题的了。LinkedBlockingQueue中直接用了AtomicInteger来记录长度,简单粗暴,在获取size等方法时都不需要加锁。
    无界队列如PriorityBlockingQueue用数组实现最小堆结构,这个需要注意数据扩容导致的性能问题。
    Deque:采用双端队列的结构,其实主要原理还是一样,只不过加了尾部进出队列的方法。

    总结

    要学好Java并发编程,最重要的还是要理解JMM中并发问题的原理、Volatile+CAS的实现、Synchronized对象锁,几乎里面所有的东西都是围绕这几个东西来实现的。

    相关文章

      网友评论

        本文标题:Java并发容器

        本文链接:https://www.haomeiwen.com/subject/pdluuqtx.html