多线程协作除了上一篇中讲到的简单的生产者消费者模型的几种实现,jdk还提供了一些其他api,实现线程间协作的模型:CountDownLatch用于倒计数栅栏模型,一个线程等待其他多个线程就绪后再继续执行;CyclicBarrier用于循环路障模型,可重置循环使用的倒计数栅栏;Semaphore用于信号量模型,释放一定总量的信号量,每个线程需要获取一个信号量才能执行,从而进行限流
CountDownLatch:
倒计数栅栏模型的使用场景
- 场景一:计数设为1,启动多个线程await,等待主线程发送countdown信号后同步开始执行任务,场景类似跑步比赛,运动员都就绪等待发令枪响。可以用于简单的并发测试
- 场景二:计数设为n,将一个任务划分为n个线程进行同时处理,统计线程await等待所有n个线程执行完毕countdown,计数归零后完成统计工作。
- 场景三:结合前两者,设置两个CountDownLatch,一个计数为1的启动信号的latch,一个计数为n的完成信号的latch,一个大任务,用n个线程执行,每个线程都等待统筹线程将启动信号计数为1的latch计数countdown归零后执行任务。执行完成后每个线程countdown,将完成信号的n个计数的lantch点计数归零后统筹线程进行统计。
- 下面的代码示例是演示场景二中CountDownLatch的使用
@Slf4j
public class CountDownLatchDemo {
public static void main(String[] args) throws Exception{
// 定义需要等待的倒计时个数
CountDownLatch latch=new CountDownLatch(10);
Random random=new Random();
// 推荐的线程池的定义方法
ExecutorService executor=new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS,
// 定义你的等待队列大小
new LinkedBlockingDeque<>(50),
// 定义你的线程生成方法
new ThreadFactory() {
private AtomicInteger threadNum=new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"custom work thread "+threadNum.addAndGet(1));
}
},
(r,e)->{
// 定义你的拒绝策略
String message="Task " + r.toString() + " rejected from " + e.toString();
log.error(message);
throw new RejectedExecutionException(message);
});
for(int i=0;i<10;i++){
int j=i;
executor.execute(()->{
try {
// 这里用于自己的业务实现,如果需要使用共享变量,注意使用线程安全的api或者同步锁
Thread.sleep(random.nextInt(2000));
log.debug("the {} is ready,i is {}",Thread.currentThread().getName(),j);
// 执行完即完成一个倒数计时
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.execute(()->{
try {
long t1=System.currentTimeMillis();
log.debug("the thread {} is awaiting !",Thread.currentThread().getName());
latch.await();
log.debug("the thread {} is running ,waiting time is {} ms !",Thread.currentThread().getName(),System.currentTimeMillis()-t1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long t1=System.currentTimeMillis();
log.debug("main thread is waiting !");
// 等齐了或者倒计时到后继续
latch.await(2000L,TimeUnit.SECONDS);
log.debug("main thread is end ,waiting time is {} ms !",System.currentTimeMillis()-t1);
executor.shutdown();
}
}
debug信息如下:
10:59:17.952 [main] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - main thread is waiting !
10:59:18.091 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 3 is ready,i is 2
10:59:18.094 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the thread custom work thread 3 is awaiting !
10:59:18.179 [custom work thread 8] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 8 is ready,i is 7
10:59:18.282 [custom work thread 1] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 1 is ready,i is 0
10:59:18.599 [custom work thread 5] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 5 is ready,i is 4
10:59:19.096 [custom work thread 9] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 9 is ready,i is 8
10:59:19.246 [custom work thread 6] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 6 is ready,i is 5
10:59:19.540 [custom work thread 10] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 10 is ready,i is 9
10:59:19.574 [custom work thread 2] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 2 is ready,i is 1
10:59:19.670 [custom work thread 4] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 4 is ready,i is 3
10:59:19.941 [custom work thread 7] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 7 is ready,i is 6
10:59:19.941 [main] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - main thread is end ,waiting time is 1990 ms !
10:59:19.941 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the thread custom work thread 3 is running ,waiting time is 1847 ms !
CyclicBarrier:
循环路障模型的使用场景
- 计数设为n的可循环使用的路障,用于多个线程调用await,n个线程都达到await之后路障计数归零,线程一起同步继续,同时该路障被重置为n,可以继续循环使用。路障还可以设置一个执行器,在路障计数归零时被触发。用于比如,将一个大任务化为m个子任务,用n个线程执行,当每执行完n个子任务后,触发一次统计任务,同时开启下一批n个子任务的执行;
- 下面的代码示例即同时执行两个任务,每次两个都执行完再启动下一轮两个任务。
@Slf4j
public class CyclicBarrierDemo {
public static void main(String[] args) {
AtomicInteger num=new AtomicInteger(0);
CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
num.addAndGet(1);
log.debug("cyclicBarrier arrived,both ready to run,time is {}",num.get());
});
// 演示demo可以用简单的新线程池
ExecutorService executorService= Executors.newFixedThreadPool(2);
executorService.execute(()->{
int i=0;
while (i<4) {
try {
log.debug("job 1 begin to start!");
cyclicBarrier.await();
log.debug("cyclicBarrier arrived! job 1 is running to start!");
i++;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
executorService.execute(()->{
int i=0;
while (i<4) {
try {
log.debug("job 2 begin to start!");
cyclicBarrier.await();
log.debug("cyclicBarrier arrived! job 2 is running to start!");
i++;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
debug信息如下:
11:56:29.438 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.438 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.440 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 1
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 2
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 3
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 4
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
Semaphore
信号量模型的使用场景
- 计数为n的Semaphore,即持有n个许可,线程可应通过该信号量的acquire或者tryAcquire来获取许可,获取许可后才能使用或者获取资源,使用完后通过release释放许可。该模型的设计即是为了实现对物理或者逻辑资源的获取进行限流。同时最多n个线程持有该资源的许可。当计数设置为1,即有互斥锁的效果,相比其他锁,优势是可以在其他线程进行release,从而处理死锁。
- 下面的代码示例对于一个限定的资源,使用Semaphore颁发许可,进行限流
@Slf4j
public class SemaphoreDemo {
public static void main(String[] args) {
ResourcePool resourcePool=new ResourcePool(5);
ExecutorService executorService= Executors.newFixedThreadPool(20);
for(int i=0;i<15;i++){
executorService.execute(()->{
String s=null;
try {
s=resourcePool.getResource();
log.debug("{} 线程 获取资源 {}",Thread.currentThread().getName(),s);
Thread.sleep(1000l);
} catch (Exception e) {
e.printStackTrace();
}finally {
// 释放资源确保在finnaly中
if(StringUtils.hasText(s)){
resourcePool.releaseResource(s);
}
}
});
}
executorService.shutdown();
}
}
@Slf4j
@Data
class ResourcePool{
private Semaphore semaphore;
private Queue<String> sourceQueue;
public ResourcePool(Integer n){
semaphore=new Semaphore(n);
sourceQueue=new LinkedList<>();
for(int i=0;i<n;i++){
sourceQueue.add(String.valueOf(i));
}
}
String getResource() throws Exception{
if(semaphore.tryAcquire(10L, TimeUnit.SECONDS)){
log.debug("成功获取信号量");
return sourceQueue.poll();
}else {
return null;
}
}
void releaseResource(String s){
log.debug("释放资源 {}",s);
semaphore.release();
sourceQueue.add(s);
}
}
打印如下:
13:20:20.970 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.972 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-1 线程 获取资源 0
13:20:20.972 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-4 线程 获取资源 3
13:20:20.972 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-3 线程 获取资源 4
13:20:20.972 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-5 线程 获取资源 2
13:20:20.972 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-2 线程 获取资源 1
13:20:21.976 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 3
13:20:21.976 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
13:20:21.976 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 1
13:20:21.976 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 0
13:20:21.976 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 2
13:20:21.976 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.976 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-6 线程 获取资源 4
13:20:21.976 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-7 线程 获取资源 1
13:20:21.977 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-8 线程 获取资源 0
13:20:21.977 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-10] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-9 线程 获取资源 2
13:20:21.977 [pool-1-thread-10] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-10 线程 获取资源 null
13:20:22.979 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
13:20:22.979 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 2
13:20:22.979 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 0
13:20:22.979 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 1
13:20:22.979 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.979 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-11 线程 获取资源 4
13:20:22.979 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.979 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.979 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.980 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-13 线程 获取资源 2
13:20:22.980 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-12 线程 获取资源 0
13:20:22.980 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-14 线程 获取资源 1
13:20:23.980 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
13:20:23.980 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 2
13:20:23.980 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 0
13:20:23.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:23.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-15 线程 获取资源 4
13:20:23.985 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 1
13:20:24.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
本篇讲到了多线程协作的CountDownLatch、CyclicBarrier、Semaphore三个api的使用场景和使用用例,其他工具比如信号量的api在guava的RateLimiter也可以实现限流,使用guava的令牌桶思想,也可以用redis实现在分布式环境下的限流。本轮多线程篇将暂时写到这,将来有人咨询其他问题也可以继续更新多线程篇。下一篇将是关于redis的搭建、使用、集群和在分布式情况下的应用场景。
网友评论