CountDownLatch
在开发过程中经常会碰到一个任务需要开启多个线程,然后将多个线程的执行结果汇总。比如说查询全量数据,考虑数据量的问题,我们基本上会做分页,这时候就需要多次循环调用。CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
执行原理
CountDownLatch内部实现了AQS,初始化CountDownLatch的时候,会调用Sync的构造方法将count赋值给state变量。多个线程调用countDown的时候,是使用CAS递减state的值;调用await方法的线程会被放在AQS阻塞队列中,等待计数器为0时,唤醒该线程。
核心方法
//构造方法,初始化计数器为count
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//Sync实现了AQS
this.sync = new Sync(count);
}
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//挂起timeout,超过这个时间继续执行
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//将count -1
public void countDown() {
sync.releaseShared(1);
}
//获取当前count值
public long getCount() {
return sync.getCount();
}
走个栗子
public class CountDownLatchDemo implements Runnable{
public static CountDownLatch countDownLatch = new CountDownLatch(10);
private Integer count;
public CountDownLatchDemo( Integer count) {
this.count = count;
}
@Override
public void run() {
try {
System.out.println("分页查询:"+count*100+"~"+(count+1)*100);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
public static void main(String[] args) throws InterruptedException{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i++) {
CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo(i);
threadPoolExecutor.submit(countDownLatchDemo);
}
countDownLatch.await();
System.out.println("分页全量数据查询完毕");
}
}
CyclicBarrier
等待所有线程达到一个屏障时在执行.
原理
CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。
核心方法
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
//定义一个CyclicBarrier,parties 代表拦截的线程数量
public CyclicBarrier(int parties) {
this(parties, null);
}
//定义一个CyclicBarrier,parties 代表拦截的线程数量,由最后一个线程执行barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//调用
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
//返回当前在屏障处等待的设备
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
//屏障是否可用
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
//返回参与屏障的数量
public int getParties() {
return parties;
}
跑个栗子
@Slf4j
public class CyclicBarrierTest implements Runnable {
private CyclicBarrier barrier;
private String name;
private Long time;
public CyclicBarrierTest(CyclicBarrier barrier, String name, Long time) {
this.barrier = barrier;
this.name = name;
this.time = time;
}
@Override
public void run() {
try {
Thread.sleep(time * 1000);
System.out.println(name + "到了");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
@Slf4j
public class MainThread implements Runnable {
@Override
public void run() {
System.out.println("人到齐了,开饭.....");
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new MainThread());
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小花",2L));
executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小红",1L));
executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小明",5L));
}
}
cyclicbarriertest.gif
其他例子
public class CyclicBarrierDemo implements Runnable {
CyclicBarrier cyclicBarrier;
private Integer count;
public CyclicBarrierDemo(CyclicBarrier cyclicBarrier, Integer count) {
this.cyclicBarrier = cyclicBarrier;
this.count = count;
}
@Override
public void run() {
try {
System.out.println("accept iot device run data:" + count);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Thread() {
@Override
public void run() {
System.out.println("run data count" + 10 + ",开始执行。。。。");
}
});
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i++) {
CyclicBarrierDemo cyclicBarrierDemo = new CyclicBarrierDemo(cyclicBarrier,i);
threadPoolExecutor.submit(cyclicBarrierDemo);
}
}
}
Semaphore
Semaphore 信号量,控制并发时线程的数量。
跑个栗子
//健身房有好几个跑步机
public class FitnessRoom {
class TreadMill {
private Integer num;
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
public TreadMill(Integer num) {
this.num = num;
}
}
private TreadMill[] treadMills = new TreadMill[]{new TreadMill(1), new TreadMill(2), new TreadMill(3), new TreadMill(4)};
private boolean[] use = new boolean[4];
Semaphore semaphore = new Semaphore(4, true);
//获取一个跑步机
public TreadMill get() throws InterruptedException {
semaphore.acquire(1);
return getAvailable();
}
// 遍历找一个没人的跑步机
public TreadMill getAvailable() {
for (int i = 0; i < use.length; i++) {
if (!use[i]) {
use[i] = true;
return treadMills[i];
}
}
return null;
}
/**
* 释放跑步机
* @param treadMill
*/
public void release(TreadMill treadMill) {
for (int i = 0; i < use.length; i++) {
if (treadMills[i] == treadMill) {
if (use[i]) {
use[i] = false;
}
}
}
}
}
public class MainThread implements Runnable {
private String name;
private FitnessRoom fitnessRoom;
public MainThread(String name, FitnessRoom fitnessRoom) {
this.name = name;
this.fitnessRoom = fitnessRoom;
}
@Override
public void run() {
try {
FitnessRoom.TreadMill treadMill = fitnessRoom.get();
if (null != treadMill) {
System.out.println(name + "在" + treadMill.getNum() + "号跑步机上跑步");
TimeUnit.SECONDS.sleep(2);
System.out.println(name + "跑完了");
//跑步机腾出来了
fitnessRoom.release(treadMill);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
FitnessRoom fitnessRoom = new FitnessRoom();
for (int i = 0; i < 20; i++) {
executorService.execute(new MainThread(i + "", fitnessRoom));
}
}
}
semaphore.gif
Exchanger
两个线程之间交互数据工具类
原子操作类
AtomicBoolean
AtomicInteger
AtomicLong
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
AtomicReference
AtomicReferenceFieldUpdater
AtomicMarkableReference
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicStampedFieldUpdater
AtomicReferenceFieldUpdater
集合工具类
ConcurrentHashMap
ConcurrentSkipListMap
CopyOnWriteArrayList
CopyOnWriteArraySet
网友评论