美文网首页
Java 并发包之CountDownLatch、CyclicBa

Java 并发包之CountDownLatch、CyclicBa

作者: cyrilsun | 来源:发表于2019-02-27 10:36 被阅读0次

    一、概述

    在高并发的情况下,java提供了concurrent包,来满足部分特定并发场景需求,无论是重复造轮子、或是有技术需求,都可以了解下

    concurrent中的两个同步工具类:

    CountDownLatch(计数器)

    CyclicBarrier(循环栅栏)

    二、使用场景及实现

    2.1、CountDownLatch简介
    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行

    关键词:

    计数器、aqs

    2.2、CountDownLatch使用场景

    image image image image image

    2.3、CountDownLatch实现

    CountDownLatch基于AQS(队列同步器),底层全部使用cas实现

    cas一分钟:
    CAS是一个原子操作,在本地方法(JNI)出现后,java可以利用处理器提供的指令保证并发场景的同步,大概流程是:比较一个内存位置的值并且只有相等时修改这个内存位置的值为新的值,保证了新的值总是基于最新的信息计算的,如果有其他线程在这期间修改了这个值则CAS失败

    AQS实现:

    内置队列

    image image

    CANCELLED 取消状态 ==1 由于在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化

    SIGNAL 等待触发状态(前节点状态为SIGNAL,当前节点才能挂起) ==-1 后继节点的线程处于等待状态

    CONDITION 等待条件状态 ==-2 节点在等待队列中

    PROPAGATE 状态需要向后传播 ==-3 表示下一次共享式同步状态获取将会无条件被传播下去

    2.3.1线程获取锁过程:

    image
        private Node addWaiter(Node mode) {
                Node node = new Node(Thread.currentThread(), mode);
                // Try the fast path of enq; backup to full enq on failure
                Node pred = tail;
                if (pred != null) {            //队列已经初始化且存在多个节点
                    node.prev = pred;          //将当前node的上一节点指向tail
                    if (compareAndSetTail(pred, node)) {         //设置tail为当前节点
                        pred.next = node;         //队列插入成功返回当前节点
                        return node;
                    }
                }
                enq(node);               //如果tail节点为空,说明队列还未初始化、进入enq   或者在当前node入队时前节点已出队也进入enq
                return node;
            }
        
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))      //初始化时则将head节点指向一个空节点,代表已执行cas的B线程
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    
    image
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;                              //是否删除节点
            try {
                boolean interrupted = false;                    //是否中断状态
                for (;;) {                                     //进入自旋
                    final Node p = node.predecessor();        //判断前一个节点是否是head节点
                    if (p == head && tryAcquire(arg)) {       //如果是,则尝试再次获取锁
                        setHead(node);                        //设置head为当前node节点
                        p.next = null; // help GC             
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)                       //如果pred的waitStatus为Node.SIGNAL,则返回true通过LockSupport.park()方法把线程A挂起,并等待被唤醒
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {                                 //如果pred的waitStatus > 0,表明pred的线程状态CANCELLED,需从队列中删除
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;           //上个节点的上个节点》》pred(上上个节点)》当前节点的上个节点
                } while (pred.waitStatus > 0);
                pred.next = node;                            //上上个节点的下个节点为当前节点
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        //如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL
            }
            return false;
        }
    

    2.3.2释放锁过程:

     private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);         //如果头结点head的waitStatus值为-1,则用CAS指令重置为0;
    ​
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)        
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)                               //找到waitStatus值小于0的节点s,通过LockSupport.unpark(s.thread)唤醒线程
                LockSupport.unpark(s.thread);
        }
    

    CountDownLatch包装

    主线程执行await方法,tryAcquireShared方法中如果state不等于0,返回-1,则加入到等待队列中,主线程通过LockSupport.park(this)被挂起。

    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    如果state为0,通过LockSupport.unpark唤醒await方法中挂起的主线程。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    2.4 CyclicBarrier的使用场景

        `public static final int INIT_SIZE = 4;
        private static CyclicBarrier barrier;
        public static void main(String[] args) {
            System.out.println("开启CyclicBarrier");
            //初始化CyclicBarrier
            barrier = new CyclicBarrier(INIT_SIZE, new Runnable() {
                public void run() {
                    System.out.println("所有线程都就绪了");
                }
            });
            //开启3个线程
            for (int i=0;i<INIT_SIZE;i++){
                new ThreadDemo().start();
            }
        }
        static class ThreadDemo extends Thread {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+"线程就绪");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName()+"线程继续执行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
    开启CyclicBarrier
    Thread-0线程就绪
    Thread-2线程就绪
    Thread-1线程就绪
    所有线程都就绪了
    Thread-1线程继续执行
    Thread-0线程继续执行
    Thread-2线程继续执行
    
     public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;//线程数初始化
            this.count = parties;//剩余未到达栅栏线程数
            this.barrierCommand = barrierAction;
        }
     public CyclicBarrier(int parties) {
         this(parties, null);
     }
    

    2.5 CyclicBarrier的实现

    关键词:ReentrantLock、Condition

    ReentrantLock 一分钟:
    可重入锁,有两种获取锁方式:公平锁和非公平锁;
    公平锁:统一线程可以多次获取锁,state相应+1,其他线程进入队列排队,直到state=0释放锁
    非公平锁:新线程可直接尝试竞争锁,不进入等待队列

    Condition :一分钟
    Condition是同步器的内部类,提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式

    signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

     /**
         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock(); //获取ReentrantLock互斥锁
            try {
                final Generation g = generation;//获取generation对象
    ​
                if (g.broken)//如果generation损坏,抛出异常
                    throw new BrokenBarrierException();
    ​
                if (Thread.interrupted()) {
                   //如果当前线程被中断,则调用breakBarrier方法(将损坏状态设置为 true),停止CyclicBarrier,并唤醒所有线程
                    breakBarrier();
                    throw new InterruptedException();
                }
    ​
                int index = --count; // 减count
                //index=0,也就是说,有0个线程未满足CyclicBarrier条件,也就是条件满足,
                //可以唤醒所有的线程了
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                       //这就是构造器的第二个参数,如果不为空的话,就执行这个Runnable的run方法,注意是最后一个执行await操作的线程执行的这个run方法。
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        //唤醒所有的等待线程
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)//如果执行栅栏任务的时候失败了,就将栅栏失效,并保证一定能唤醒所有线程
                            breakBarrier();
                    }
                }
    ​
                // loop until tripped, broken, interrupted, or timed out
                //当barrier的条件满足、当前线程被中断或者已经超时才会跳出循环
                for (;;) {
                    try {
                        if (!timed)//如果没有指定超时参数,则直接调用Condition的await方法。
                            trip.await();
                        else if (nanos > 0L)//否则根据超时时间调用awaitNanos方法
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        //执行过程中,线程被中断的话,改变generation状态
                        if (g == generation && ! g.broken) {
                            breakBarrier();//执行breakBarrier,唤醒所有线程
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    ​
                    if (g.broken)//如果当前generation已经损坏,抛出异常
                        throw new BrokenBarrierException();
    ​
                    if (g != generation)//如果generation已经更新换代,则返回当前index跳出
                        return index;
                    //如果是参数是超时等待并已经超时,则执行breakBarrier()方法唤醒所有等待线程
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    

    三、CountDownLatch、CyclicBarrier区别

    • CyclicBarrier是多个线程互相等待,直到到达同一个同步点,再继续一起执行(CountDownLatch可以在线程中执行await来实现CyclicBarrier)
    • CountDownLatch 的缺点是不能重复使用,CyclicBarrier.reset 能重置栅栏,此时正在等待的线程会收到 BrokenBarrierException

    相关文章

      网友评论

          本文标题:Java 并发包之CountDownLatch、CyclicBa

          本文链接:https://www.haomeiwen.com/subject/cgdxuqtx.html