概述
通过常见并发工具类来实现多线程之间的调度:CountDownLatch、CyclicBarrier、Semaphore、Thread.join()
- CountDownLatch:主、子线程同步(1+N的同步)。阻塞主线程,等待指定个数的子线程完成后,再执行主线程。依赖 1个await + N个countDown 两个函数完成功能
- CyclicBarrier(栅栏):N个子线程的同步。让多个子线程阻塞,满足一定数量的await函数调用后,所有子线程继续执行后面的逻辑。依赖 N个await完成阻塞和计数。
- Semaphore:多线程限流。通过初始化参数指定允许最高并发线程,用阻塞等待的方式限流。但是功能没有JUC线程池丰富(无法复用线程)。依赖 require + release 两个函数完成功能
- 其实Thread.join()也可以实现 1+1的线程同步,只不过控制没有CountDownLatch灵活。线程的同步机制,依赖于子线程彻底运行结束,但是可以保证可见性。而CountDownLatch不需要等待子线程结束,只要计数满足N,阻塞线程就可以继续执行
CountDownLatch
通过latch.await方法阻塞主线程,多个子线程调用latch.countDown(),latch来负责计数,来完成一个大的任务
比如5个子任务的下载,主线程阻塞,等待5个子线程都下载完毕,主线程才唤醒,打印下载完毕
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(5);
for(int i = 0; i < 5; i++){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 开始下载");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 完成下载");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
latch.await();
System.out.println("全部下载完毕");
}
CyclicBarrier(栅栏 子线程同步)
阻塞引用的所有的子线程,直到满足一定的条件,所有的子线程才回复工作。但是和CDL不同,更简洁,子线程调用 CyclicBarrier.await()的时候,不仅阻塞当前子线程,而且计数会减少1。不需要额外调用别的方法。
通过它可以实现让一组线程等待至某个状态之后再全部同时执行。这个阻塞的是所有的子线程,一直到都完成大家在一起往下执行。而不是阻塞主线程
支持回调方法
@Override
public void run() {
try{
System.out.println("子线程: 子任务"+taskCode+"已准备就绪,等待其他子任务就绪。。。");
begin_cyclicBarrier.await();
System.out.println("子线程: 子任务"+taskCode+"开始执行");
Thread.sleep(1000);
count.addAndGet(Integer.valueOf(taskCode));
System.out.println("子线程: 子任务"+taskCode+"执行完成");
end_cyclicBarrier.await();
}catch(Exception e){
}
}
Semaphore(多线程限流)
多线程限流,通过控制初始化参数,控制最大支持的并发数,从而限流,semaphore.acquire 计数,semaphore.release释放。但是功能没有线程池的丰富。
在限流方面,类似线程池,但是不具备线程池的复用线程、线程数控制、饱和策略等功能
线程池的逻辑是:core线程、BlockingQueue、max线程、丢弃。
信号量的逻辑:进入计数,释放计数,总量控制,阻塞等待。
限制最多N个线程同时运行,超出的线程争夺会阻塞,直到有线程release。这和线程池的maxCore类似但是不一样。
//测试类50个线程准备接水,但是水龙头只有10个
public class Test {
public static void main(String[] args) throws InterruptedException {
final Semaphore semaphore=new Semaphore(10);
for(int i=0;i<50;i++){
TestThread thread=new TestThread(semaphore,i+1);
thread.start();
}
}
}
public class TestThread extends Thread{
Semaphore semaphore;
int number=0;
TestThread(Semaphore semaphore,int number){
this.semaphore=semaphore;
this.number=number;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("学生"+this.number+"开始接水...");
Thread.sleep(2000);
System.out.println("学生"+this.number+"接水完毕");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
网友评论