美文网首页Android 源码分析
Android 并发之CountDownLatch、Cyclic

Android 并发之CountDownLatch、Cyclic

作者: 有没有口罩给我一个 | 来源:发表于2019-03-04 14:28 被阅读0次

    Android 线程简单分析(一)
    Android 并发之synchronized锁住的是代码还是对象(二)
    Android 并发之CountDownLatch、CyclicBarrier的简单应用(三)
    Android 并发HashMap和ConcurrentHashMap的简单应用(四)(待发布)
    Android 并发之Lock、ReadWriteLock和Condition的简单应用(五)
    Android 并发之CAS(原子操作)简单介绍(六)
    Android 并发Kotlin协程的重要性(七)(待发布)
    Android 并发之AsyncTask原理分析(八)(待发布)
    Android 并发之Handler、Looper、MessageQueue和ThreadLocal消息机制原理分析(九)
    Android 并发之HandlerThread和IntentService原理分析(十)

    CountDownLatch:

    它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

    CountDownLatch.png

    构造函数:

    public CountDownLatch(int count) {
    }
    

    参数count为计数值

    然后下面这3个方法是CountDownLatch类中最重要的方法:

    public void await() throws InterruptedException { };   //调用await()方法的线程,如果count>0会被挂起,它会等待直到    count值为0才继续执行
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
    public void countDown() { };  //将count值减1
    
    场景

    类似的场景,项目开发流程,将各个功能模块分发至每个人手里,每个人做完及时汇报,当所有人都开发完成,那么接下来就是发布上线,CountDownLatch就是等待一组任务完成之后,继续执行。

    举个栗子:

    创建Project线程类CountDownLatchProject,并声明一个CountDownLatch属性mCountDownLatch来控制Project项目线程等待所有开发者开发完成。项目线程启动后会调用await()方法并进入等待状态,每一个开发者开发完成后调用complete()方法,并把mCountDownLatch中的计数器减1,当计数器等于0的时候项目线程继续执行;

    项目线程类:

    public class CountDownLatchProject implements Runnable {
    private CountDownLatch mCountDownLatch;
    
    public CountDownLatchProject(int number) {
        this.mCountDownLatch = new CountDownLatch(number);
    }
    
    public void complete(String name) {
        Log.e("tag", name + "完成...............");
        mCountDownLatch.countDown();
    }
    
    @Override
    public void run() {
        Log.e("tag", "项目开始..................");
        try {
            mCountDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Log.e("tag", "项目上线.....................");
      }
    }
    

    开发者类:

    public class CountDownLatchdevalper implements Runnable {
    
    private CountDownLatchProject mCountDownLatchProject;
    
    public CountDownLatchdevalper(CountDownLatchProject count) {
        this.mCountDownLatchProject = count;
    }
    
    @Override
    public void run() {
        Log.e("tag", "" + Thread.currentThread().getName() + "开始开发.............");
        try {
            long duration = (long) (Math.random() * 20);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        mCountDownLatchProject.complete(Thread.currentThread().getName());
    }
    }
    

    创建一个项目开发,并初始化10个人参加项目开发,当所有开发人员开发完成,项目进入下一阶段:

    public class MainActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        CountDownLatchProject project = new CountDownLatchProject(10);
        new Thread(project).start();
    
    
        for (int i = 0; i < 10; i++) {
            CountDownLatchdevalper count = new CountDownLatchdevalper(project);
            new Thread(count).start();
            }
        }
    }
    

    查看日志:

    CountDownLatchProject: 项目开始..................
    CountDownLatchdevalper: Thread-232开始了............
    CountDownLatchdevalper: Thread-228开始了............
    CountDownLatchdevalper: Thread-229开始了............
    CountDownLatchdevalper: Thread-231开始了............
    CountDownLatchdevalper: Thread-230开始了............
    CountDownLatchdevalper: Thread-233开始了............
    CountDownLatchdevalper: Thread-234开始了............
    CountDownLatchdevalper: Thread-236开始了............
    CountDownLatchdevalper: Thread-235开始了............
    CountDownLatchdevalper: Thread-237开始了............
    CountDownLatchProject: Thread-233完成...............
    Thread-233CountDownLatchProject: 还有: 9 人未完成
    CountDownLatchProject: Thread-232完成...............
    Thread-232CountDownLatchProject: 还有: 8 人未完成
    CountDownLatchProject: Thread-228完成...............
    Thread-228CountDownLatchProject: 还有: 7 人未完成
    CountDownLatchProject: Thread-237完成...............
    Thread-237CountDownLatchProject: 还有: 6 人未完成
    CountDownLatchProject: Thread-230完成...............
    Thread-230CountDownLatchProject: 还有: 5 人未完成
    CountDownLatchProject: Thread-235完成...............
    Thread-235CountDownLatchProject: 还有: 4 人未完成
    CountDownLatchProject: Thread-229完成...............
    Thread-229CountDownLatchProject: 还有: 3 人未完成
    CountDownLatchProject: Thread-231完成...............
    Thread-231CountDownLatchProject: 还有: 2 人未完成
    CountDownLatchProject: Thread-234完成...............
    Thread-234CountDownLatchProject: 还有: 1 人未完成
    CountDownLatchProject: Thread-236完成...............
    Thread-236CountDownLatchProject: 还有: 0 人未完成
    CountDownLatchProject: 项目上线....................
    
    • CountDownLatch是当前线程等待其他一组线程任务完成之后在继续往下执行;
    • CountDownLatch并不是用来保护共享资源同步访问的,而是用来控制并发线程等待的;
    • CountDownLatch只允许使用一次,一旦内部计数器等于0,再调用这个方法将不起作用,如果还有第二次并发等待,你还得创建一个新的CountDownLatch。
    • 有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了

    CyclicBarrier:

    字面意思循环屏障,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做循环是因为当所有等待线程都执行完成以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做屏障,当调用await()方法之后,线程就处在屏障点了,屏障就是要阻碍的意思,它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。


    CyclicBarrier.png

    构造函数:

    1:
    public CyclicBarrier(int parties, Runnable barrierAction) {
    }
    2:
    public CyclicBarrier(int parties) {
    }
    

    参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。

    然后CyclicBarrier中最重要的方法就是await方法:

    public int await() throws InterruptedException, BrokenBarrierException { };//来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
    public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };//让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务
    

    举个栗子:

    public class Statistic implements Runnable {
        private CyclicBarrier mCyclicBarrier;
    
    
    public Statistic(CyclicBarrier parties) {
        this.mCyclicBarrier = parties;
    }
    
    @Override
    public void run() {
        Log.e("tag", "mCyclicBarrier:  " + Thread.currentThread().getName() + "开始" + mCyclicBarrier.getNumberWaiting());
        try {
            long duration = (long) (Math.random() * 20);
            TimeUnit.SECONDS.sleep(duration);
            mCyclicBarrier.await();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Log.e("tag", "mCyclicBarrier:  " + Thread.currentThread().getName() + "结束");
        }
    }
    

    创建3个Statistic线程类:

    public class MainActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            new Thread(new Statistic(cyclicBarrier)).start();
          }
        }
     }
    

    查看日志:

      mCyclicBarrier:  Thread-143开始0
      mCyclicBarrier:  Thread-144开始0
      mCyclicBarrier:  Thread-142开始0
      mCyclicBarrier:  Thread-143结束
      mCyclicBarrier:  Thread-144结束
      mCyclicBarrier:  Thread-142结束
    
    从上面输出结果可以看出,每个线程执行完操作之后,就在等待其他线程的操作完毕,然后一起执行,即,当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了。

    如果说想要在所有线程执行完之后,进行额外的其他操作可以为使用CyclicBarrier构造函数提供Runnable参数;

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                Log.e("tag", "执行完,做额外的操作,然后接着后续的操作。");
            }
        });
        for (int i = 0; i < 3; i++) {
            new Thread(new Statistic(cyclicBarrier)).start();
        }
    

    查看日志:

    mCyclicBarrier:  Thread-154开始0
    mCyclicBarrier:  Thread-152开始0
    mCyclicBarrier:  Thread-153开始0
    执行完,做额外的操作,然后接着后续的操作,Thread-154。
    mCyclicBarrier:  Thread-152结束
    mCyclicBarrier:  Thread-153结束
    mCyclicBarrier:  Thread-154结束
    
    当3个线程都到达barrier状态后,会从3个线程中选择最后一个到达同步点的线程去执行Runnable,然后继续后续操作,注意:CyclicBarrier可以复用,而CountDownLatch无法进行重复使用;

    看看CycliBarrier中等待其他线程的源码:

    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        //可重入锁,独占锁
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            if (index == 0) {  // 最后一个线程到达屏障点
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null) {
                        command.run();//先执行额外的操作
                    }
                    ranAction = true;
                    nextGeneration();//生成下一代
                    return 0;
                } finally {
                    //这是正常完成操作,所以不会破坏新生成的代
                    if (!ranAction)
                        breakBarrier();
                }
            }
            //循环等待,直到到达同步点,断裂或被破坏,中断或者超时
            for (; ; ) {
    
                //------------------线程被挂起
                try {
                    if (!timed)
                        trip.await();//线程等待,挂起释放锁
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//线程超时等待,挂起释放锁
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();//断裂当前代,如果要复用,必须调用reset方法
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                //线程被唤醒继续执行
                if (g.broken)
                    throw new BrokenBarrierException();
                //换代
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    
    private void nextGeneration() {
        trip.signalAll(); //唤醒所有等待的线程
        count = parties;
        generation = new Generation();
    }
    
    
    private void breakBarrier() {
        generation.broken = true;//标志当前代断裂或被破坏
        count = parties;
        trip.signalAll();//唤醒所有等待的线程
    }
    
    • 其实CyclicBarrier中使用了ReentrantLock 独享锁或者可重入锁;
    • 当前线程执行await方法是其实是调用ReentrantLock 的await方法,进入条件等待队里,并释放锁;
    • 当所有的线程都到达屏障点,会选择最后一个到达屏障点的线程执行barrierCommand,然后调用ReentrantLock .signalAll唤醒所有线程,线程将进入同步队列,竞争锁,ReentrantLock (非公平锁),并调用nextGeneration()重置Generation,所以正常情况下重用CyclicBarrier可以不用调用reset方法,出现异常情况就需要调用reset方法重置状态,CyclicBarrier是可以复用的,而CountDownLatch无法进行重复使用;

    相关文章

      网友评论

        本文标题:Android 并发之CountDownLatch、Cyclic

        本文链接:https://www.haomeiwen.com/subject/sepbuqtx.html