美文网首页
CyclicBarrier(同步容器)

CyclicBarrier(同步容器)

作者: 程序员大黑鱼 | 来源:发表于2020-07-20 02:15 被阅读0次

CyclicBarrier(同步容器)

作用

它允许一组线程相互等待直到所有线程都到达一个公共的屏障点,才开始执行下面的操作,举例:例如做公交车,等所有人都坐上车了,车才启动出发

方法

public CyclicBarrier(int parties);
public CyclicBarrier(int parties, Runnable barrierAction);
private void nextGeneration();
private void breakBarrier();
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException;
public int getParties();
public int await() throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException;
public boolean isBroken();
public void reset();
public int getNumberWaiting();

示例

不设定阻塞时间

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author scottxuan
 */
@Slf4j
public class CyclicBarrierExample1 {
    //线程数
    private final static int threadNum = 4;

    //初始化线程同步数量
    final static CyclicBarrier barrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            service.execute(()->{
                update(num);
            });
            Thread.sleep(500);
        }
        service.shutdown();
    }

    public static void update(int num){
        try {
            log.info("thread ready {}",num);
            //线程阻塞
            barrier.await();
            log.info("thread continue {}",num);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//输出结果 4个线程  同步数为2  2个线程就绪 阻塞释放  开始执行await()后续代码
//01:48:07.213 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 0
//01:48:07.715 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 1
//01:48:07.715 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 1
//01:48:07.715 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 0
//01:48:08.221 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 2
//01:48:08.736 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread ready 3
//01:48:08.736 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 3
//01:48:08.736 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample1 - thread continue 2

设定阻塞时间一

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author scottxuan
 */
@Slf4j
public class CyclicBarrierExample2 {
    //线程数
    private final static int threadNum = 4;

    //初始化线程同步数量
    final static CyclicBarrier barrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            service.execute(()->{
                update(num);
            });
            Thread.sleep(2000);
        }
        service.shutdown();
    }

    public static void update(int num){
        try {
            log.info("thread ready {}",num);
            //线程阻塞
            barrier.await(1000, TimeUnit.MILLISECONDS);
            log.info("thread continue {}",num);
        } catch (Exception e) {
            log.error("barrier await error");
        }
    }
}

//输出结果 总共4个线程  同步数为2  每个线程阻塞了1秒后  解除阻塞  不符合同步的数量(同步数为2)  报错
//02:05:32.487 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 0
//02:05:33.497 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
//02:05:34.492 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 1
//02:05:34.492 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
//02:05:36.504 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 2
//02:05:36.504 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error
//02:05:38.516 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 3
//02:05:38.516 [pool-1-thread-1] ERROR scottxuan.cyclicbarrier.CyclicBarrierExample2 - barrier await error

设定阻塞时间二

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author scottxuan
 */
@Slf4j
public class CyclicBarrierExample2 {
    //线程数
    private final static int threadNum = 2;

    //初始化线程同步数量
    final static CyclicBarrier barrier = new CyclicBarrier(1);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            service.execute(()->{
                update(num);
            });
            Thread.sleep(2000);
        }
        service.shutdown();
    }

    public static void update(int num){
        try {
            log.info("thread ready {}",num);
            //线程阻塞
            barrier.await(1000, TimeUnit.MILLISECONDS);
            log.info("thread continue {}",num);
        } catch (Exception e) {
            log.error("barrier await error");
        }
    }
}

//结果输出  总共2个线程  同步数为1  线程阻塞了1秒后  解除阻塞  符合同步的数量(同步数量为1)  直接执行await之后代码
//02:08:40.742 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 0
//02:08:40.746 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread continue 0
//02:08:42.751 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread ready 1
//02:08:42.752 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample2 - thread continue 1

带runable的构造方法

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author scottxuan
 */
@Slf4j
public class CyclicBarrierExample3 {
    //线程数
    private final static int threadNum = 2;

    //初始化线程同步数量
    final static CyclicBarrier barrier = new CyclicBarrier(2,() -> {
        log.info("is ready over");
    });

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            service.execute(()->{
                update(num);
            });
        }
        service.shutdown();
    }

    public static void update(int num){
        try {
            log.info("thread ready {}",num);
            //线程阻塞
            barrier.await();
            log.info("thread continue {}",num);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//输出结果 ready 就绪之后,  构造方法中 runable方法执行  runable执行结束后  await()之后的代码开始执行
//02:13:09.648 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread ready 1
//02:13:09.648 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread ready 0
//02:13:09.652 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - is ready over
//02:13:09.652 [pool-1-thread-1] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread continue 0
//02:13:09.652 [pool-1-thread-2] INFO scottxuan.cyclicbarrier.CyclicBarrierExample3 - thread continue 1

相关文章

网友评论

      本文标题:CyclicBarrier(同步容器)

      本文链接:https://www.haomeiwen.com/subject/uahjkktx.html