美文网首页
JUC学习笔记(一)—锁工具类

JUC学习笔记(一)—锁工具类

作者: Monica2333 | 来源:发表于2018-11-07 11:53 被阅读0次

    LockSupport类:用于阻塞线程,基于线程的阻塞。而wait,notify是基于对象的,用于synchronized同步块中。
    使用和原理:
    浅谈Java并发编程系列(八)—— LockSupport原理剖析
    使用wait/notify/notifyAll实现线程间通信的几点重要说明
    AbstractQueuedSynchronizer(AQS):队列同步器,解决了同步器涉及的细节问题,如获取同步状态、FIFO同步队列。
    AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

    AQS通过内置的FIFO双向同步队列(CLH队列)来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

    static final class Node {
        /** 共享 */
        static final Node SHARED = new Node();
    
        /** 独占 */
        static final Node EXCLUSIVE = null;
    
        /**
         * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
         */
        static final int CANCELLED =  1;
    
        /**
         * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
         */
        static final int SIGNAL    = -1;
    
        /**
         * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
         */
        static final int CONDITION = -2;
    
        /**
         * 表示下一次共享式同步状态获取将会无条件地传播下去
         */
        static final int PROPAGATE = -3;
    
        /** 等待状态 */
        volatile int waitStatus;
    
        /** 前驱节点 */
        volatile Node prev;
    
        /** 后继节点 */
        volatile Node next;
    
        /** 获取同步状态的线程 */
        volatile Thread thread;
    
        Node nextWaiter;
    
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {
        }
    
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    ReentrantLock:独占可重入锁,包含公平锁和非公平锁。一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。
    公平锁与非公平锁的区别在于获取锁的时候是否按照FIFO的顺序来。释放锁不存在公平性和非公平性。

    //公平锁的tryAcquire()方法 
    //获取锁时会判断当前节点是否是头结点 
    //hasQueuedPredecessors():主要是判断当前线程是否位于CLH同步队列中的第一个。如果是则返回false,
    //否则返回true
    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
    //使用
    class X {
     *   private final ReentrantLock lock = new ReentrantLock();
     *   // ...
     *
     *   public void m() {
     *     lock.lock();  // block until condition holds
     *     try {
     *       // ... method body
     *     } finally {
     *       lock.unlock()
     *     }
     *   }
     * }
    

    与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
    ReentrantLock还提供了 Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
    ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
    ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
    ReentrantLock支持中断处理,且性能较synchronized会好些。
    ReentrantReadWriteLock:重入锁ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,但是在大多数场景下,大部分时间都是提供读服务,而写服务占有的时间较少。然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低。所以就提供了读写锁。
    读写锁维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般的排他锁有了较大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会被阻塞。

    读写锁的主要特性:
    公平性:支持公平性和非公平性。
    重入性:支持重入。读写锁最多支持65535个递归写入锁和65535个递归读取锁。
    锁降级:遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成为读锁。

    在ReentrantLock中使用一个int类型的state来表示同步状态,该值表示锁被一个线程重复获取的次数。但是读写锁ReentrantReadWriteLock内部维护着两个一对锁,需要用一个变量维护多种状态。所以读写锁采用“按位切割使用”的方式来维护这个变量,将其切分为两部分,高16为表示读,低16为表示写。分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过为运算。假如当前同步状态为S,那么写状态等于 S & 0x0000FFFF(将高16位全部抹去),读状态等于S >>> 16(无符号补0右移16位)。
    读写锁的使用:

    class CachedData {
     *   Object data;
     *   volatile boolean cacheValid;
     *   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
     *
     *   void processCachedData() {
     *     rwl.readLock().lock();
     *     if (!cacheValid) {
     *       // Must release read lock before acquiring write lock
     *       rwl.readLock().unlock();
     *       rwl.writeLock().lock();
     *       try {
     *         // Recheck state because another thread might have
     *         // acquired write lock and changed state before we did.
     *         if (!cacheValid) {
     *           data = ...
     *           cacheValid = true;
     *         }
     *         // Downgrade by acquiring read lock before releasing write lock
     *         rwl.readLock().lock();
     *       } finally {
     *         rwl.writeLock().unlock(); // Unlock write, still hold read
     *       }
     *     }
     *
     *     try {
     *       use(data);
     *     } finally {
     *       rwl.readLock().unlock();
     *     }
     *   }
     * }
    
     class RWDictionary {
     *   private final Map<String, Data> m = new TreeMap<String, Data>();
     *   private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
     *   private final Lock r = rwl.readLock();
     *   private final Lock w = rwl.writeLock();
     *
     *   public Data get(String key) {
     *     r.lock();
     *     try { return m.get(key); }
     *     finally { r.unlock(); }
     *   }
     *   public String[] allKeys() {
     *     r.lock();
     *     try { return m.keySet().toArray(); }
     *     finally { r.unlock(); }
     *   }
     *   public Data put(String key, Data value) {
     *     w.lock();
     *     try { return m.put(key, value); }
     *     finally { w.unlock(); }
     *   }
     *   public void clear() {
     *     w.lock();
     *     try { m.clear(); }
     *     finally { w.unlock(); }
     *   }
     * }
    

    CyclicBarrier:它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
    CyclicBarrier的内部是使用重入锁ReentrantLock和Condition。它有两个构造函数:

    CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
    CyclicBarrier(int parties, Runnable barrierAction) :创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    parties表示拦截线程的数量。

    CyclicBarrier适用于多线程结果合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。比如我们需要统计多个Excel中的数据,然后等到一个总结果。我们可以通过多线程处理每一个Excel,执行完成后得到相应的结果,最后通过barrierAction来计算这些线程的计算结果,得到所有Excel的总和。

     class Solver {
     *   final int N;
     *   final float[][] data;
     *   final CyclicBarrier barrier;
     *
     *   class Worker implements Runnable {
     *     int myRow;
     *     Worker(int row) { myRow = row; }
     *     public void run() {
     *       while (!done()) {
     *         processRow(myRow);
     *
     *         try {
     *           barrier.await();
     *         } catch (InterruptedException ex) {
     *           return;
     *         } catch (BrokenBarrierException ex) {
     *           return;
     *         }
     *       }
     *     }
     *   }
     *
     *   public Solver(float[][] matrix) {
     *     data = matrix;
     *     N = matrix.length;
     *     Runnable barrierAction =
     *       new Runnable() { public void run() { mergeRows(...); }};
     *     barrier = new CyclicBarrier(N, barrierAction);
     *
     *     List<Thread> threads = new ArrayList<Thread>(N);
     *     for (int i = 0; i < N; i++) {
     *       Thread thread = new Thread(new Worker(i));
     *       threads.add(thread);
     *       thread.start();
     *     }
     *
     *     // wait until done
     *     for (Thread thread : threads)
     *       thread.join();
     *   }
     * }
    

    CountDownLatch:用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。
    虽然,CountDownlatch与CyclicBarrier有那么点相似,但是他们还是存在一些区别的:
    CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
    CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
    CountDownLatch内部通过共享锁实现。在创建CountDownLatch实例时,需要传递一个int型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。当某个线程调用await()方法,程序首先判断count的值是否为0,如果不会0的话则会一直等待直到为0为止。当其他线程调用countDown()方法时,则执行释放共享锁状态,使count值 – 1。当在创建CountDownLatch时初始化的count参数,必须要有count线程调用countDown方法才会使计数器count等于0,锁才会释放,前面等待的线程才会继续运行。注意CountDownLatch不能回滚重置。
    Semaphore:信号量Semaphore是一个控制访问多个共享资源的计数器,和CountDownLatch一样,其本质上是一个“共享锁”。

    Semaphore,在API是这么介绍的:

    一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
    信号量Semaphore是一个非负整数(>=1)。当一个线程想要访问某个共享资源时,它必须要先获取Semaphore,当Semaphore >0时,获取该资源并使Semaphore – 1。如果Semaphore值 = 0,则表示全部的共享资源已经被其他线程全部占用,线程必须要等待其他线程释放资源。当线程释放资源时,Semaphore则+1
    Exchanger:它允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中。可能在应用程序(比如遗传算法和管道设计)中很有用。

    其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

    1.我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
    2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
    3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
    4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
    5.我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。
    6.如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!

    public class ExchangerTest {
    
        static class Producer implements Runnable{
    
            //生产者、消费者交换的数据结构
            private List<String> buffer;
    
            //步生产者和消费者的交换对象
            private Exchanger<List<String>> exchanger;
    
            Producer(List<String> buffer,Exchanger<List<String>> exchanger){
                this.buffer = buffer;
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() {
                for(int i = 1 ; i < 5 ; i++){
                    System.out.println("生产者第" + i + "次提供");
                    for(int j = 1 ; j <= 3 ; j++){
                        System.out.println("生产者装入" + i  + "--" + j);
                        buffer.add("buffer:" + i + "--" + j);
                    }
    
                    System.out.println("生产者装满,等待与消费者交换...");
                    try {
                        exchanger.exchange(buffer);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class Consumer implements Runnable {
            private List<String> buffer;
    
            private final Exchanger<List<String>> exchanger;
    
            public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
                this.buffer = buffer;
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() {
                for (int i = 1; i < 5; i++) {
                    //调用exchange()与消费者进行数据交换
                    try {
                        buffer = exchanger.exchange(buffer);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    System.out.println("消费者第" + i + "次提取");
                    for (int j = 1; j <= 3 ; j++) {
                        System.out.println("消费者 : " + buffer.get(0));
                        buffer.remove(0);
                    }
                }
            }
        }
    
        public static void main(String[] args){
            List<String> buffer1 = new ArrayList<String>();
            List<String> buffer2 = new ArrayList<String>();
    
            Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
    
            Thread producerThread = new Thread(new Producer(buffer1,exchanger));
            Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
    
            producerThread.start();
            consumerThread.start();
        }
    }
    

    ConcurrentSkipListMap:
    跳表(SkipList):Skip List ,称之为跳表,它是一种可以替代平衡树的数据结构,其数据元素默认按照key值升序,天然有序。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。
    SkipList具备如下特性:
    1.由很多层结构组成,level是通过一定的概率随机产生的
    2.每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的Comparator进行排序,具体取决于使用的构造方法
    3.最底层(Level 1)的链表包含所有元素
    4.如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现
    5.每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素

    SkipList
    ConcurrentSkipListMap其内部采用SkipLis数据结构无锁实现的,并发时通过多次校验和CAS保证正确性。为了实现SkipList,ConcurrentSkipListMap提供了三个内部类来构建这样的链表结构:Node、Index、HeadIndex。其中Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,HeadIndex用来维护索引层次。到这里我们可以这样说ConcurrentSkipListMap是通过HeadIndex维护索引层次,通过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只需要比较很小一部分数据了。
         * Head nodes          Index nodes
         * +-+    right        +-+                      +-+
         * |2|---------------->| |--------------------->| |->null
         * +-+                 +-+                      +-+
         *  | down              |                        |
         *  v                   v                        v
         * +-+            +-+  +-+       +-+            +-+       +-+
         * |1|----------->| |->| |------>| |----------->| |------>| |->null
         * +-+            +-+  +-+       +-+            +-+       +-+
         *  v              |    |         |              |         |
         * Nodes  next     v    v         v              v         v
         * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
         * | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
         * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
         *
    
     static final class Node<K,V> {
            final K key;
            volatile Object value;
            volatile ConcurrentSkipListMap.Node<K, V> next;
    
            /** 省略些许代码 */
        }
       static class Index<K,V> {
            final ConcurrentSkipListMap.Node<K,V> node;
            final ConcurrentSkipListMap.Index<K,V> down;
            volatile ConcurrentSkipListMap.Index<K,V> right;
    
            /** 省略些许代码 */
        }
    
     static final class HeadIndex<K,V> extends Index<K,V> {
            final int level;  //索引层,从1开始,Node单链表层为0
            HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
                super(node, down, right);
                this.level = level;
            }
        }
    

    CopyOnWriteArrayList:可支持并发读写的List,基于数组和copyonwrite机制,写操作加锁,每次修改都是重新创建一个副本,然后再更新对象的存储数组引用。读操作不加锁,使用volatile修饰存储数组,使得写的结果可见。适用于读多写少的场景。

    public class CopyOnWriteArrayList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    
        /** The lock protecting all mutators */
        final transient ReentrantLock lock = new ReentrantLock();
    
        /** The array, accessed only via getArray/setArray. */
        private transient volatile Object[] array;
    
    /**
         * Appends the specified element to the end of this list.
         *
         * @param e element to be appended to this list
         * @return {@code true} (as specified by {@link Collection#add})
         */
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
    @SuppressWarnings("unchecked")
        private E get(Object[] a, int index) {
            return (E) a[index];
        }
    
        /**
         * {@inheritDoc}
         *
         * @throws IndexOutOfBoundsException {@inheritDoc}
         */
        public E get(int index) {
            return get(getArray(), index);
        }
    
    /**
         * Returns the number of elements in this list.
         *
         * @return the number of elements in this list
         */
        public int size() {
            return getArray().length;
        }
    }
    

    CopyOnWriteArraySet:可去重的并发set,基于CopyOnWriteArrayList实现。

    public class CopyOnWriteArraySet<E> extends AbstractSet<E>
            implements java.io.Serializable {
    
        private final CopyOnWriteArrayList<E> al;
        public boolean add(E e) {
            return al.addIfAbsent(e);
        }
    }
    

    相关文章

      网友评论

          本文标题:JUC学习笔记(一)—锁工具类

          本文链接:https://www.haomeiwen.com/subject/lenmxqtx.html