概述
引入
很多时候我们在启动某个程序或者执行某些操作时,需要先进行预先处理,处理完成再操作。一般情况下,为了提供处理的速度,我们都是采用多线程进行预处理的。如何方便的进行预处理子线程和操作主线程的数据同步,就是一个比较明显的问题了。
例子
问题
在之前我遇到一个面试题,有A,B,C三个线程,A线程是主线程,要求先执行B,C线程,然后再执行A线程。
这个有以下几种实现方法:
方法一、自己定义信号量
/**
* @author lipengcheng3 Created date 2019-02-12 17:45
*/
public class CountDownLatchLearn {
public static int x=2;
public static void main(String[] args) throws InterruptedException {
Thread a = new Thread(new JustAThread(), "Thread-a");
Thread b = new Thread(new JustAThread(), "Thread-b");
a.start();
b.start();
while (getX() != 0){}
System.out.println("Main");
}
static class JustAThread implements Runnable {
public JustAThread() {
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
CountDownLatchLearn.minus1();
}
}
synchronized public static void minus1(){
x--;
}
synchronized public static int getX(){
return x;
}
}
方法二、 join
/**
* @author lipengcheng3 Created date 2019-02-12 17:45
*/
public class CountDownLatchLearn1 {
public static void main(String[] args) throws InterruptedException {
Thread a = new Thread(new JustAThread(), "Thread-a");
Thread b = new Thread(new JustAThread(), "Thread-b");
a.start();
b.start();
a.join();
b.join();
System.out.println("Main");
}
static class JustAThread implements Runnable {
public JustAThread() {
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
}
方法三、CountDownLatch
/**
* @author lipengcheng3 Created date 2019-02-12 17:45
*/
public class CountDownLatchLearn2 {
public static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
Thread a = new Thread(new JustAThread(), "Thread-a");
Thread b = new Thread(new JustAThread(), "Thread-b");
a.start();
b.start();
countDownLatch.await();
System.out.println("Main");
}
static class JustAThread implements Runnable {
public JustAThread() {
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
CountDownLatchLearn2.countDownLatch.countDown();
}
}
}
比较与总结
方法一自行定义,用起来是最灵活的,但是相对的,在信号量的存取时采用synchronized
会造成阻塞和低效化,多线程调优的结果和用户自己的调优能力有关,不同人的出入可能较大。而且方法一需要编写的代码太多,没有合理利用java现有的封装好的库,造成了编程的不便。
方法二通过join
完成了等待操作,操作简单便捷。但是扩展性差,如果我新加一个线程就得新加一个join()
,如果我用循环启了n个线程怎么办?
方法三就相对来说优雅了一些,因为方法三是依靠的java提供的线程安全类,将所有的线程间同步的操作全都委托给了CountDownLatch
,优雅的实现了主线程的阻塞等待。
摘要
本文介绍了闭锁CountDownLatch
的实现机理及基本用法
类介绍
类结构分析
CountDownLatch
和之前阅读的ReentrantReadWriteLock
,ReentrantLock
一样,通过依赖扩展了AQS
的内部类来完成对线程的控制。
类定位
闭锁是一种同步工具类,用来进行线程延迟。
注意
可以定义多个闭锁来方便的进行线程的控制。
源码解读
内部类介绍
源码
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
源码核心思想
通过AQS
来进行访问线程的控制,复写了tryXXXShared()
相关的方法,这样,在其他线程调用tryReleaseShared(int releases)
时会使state--
,在主线程调用tryAcquireShared(int acquires)
会因为state > 0
而阻塞。
其实根据
AQS
的包装,每有一个线程调用releaseShared()
相关操作时,会进行一次同步锁的释放,并在释放成功的情况下唤醒队中的后一个线程。这里注意,队列中的线程都是调用
acquireShared()
停住的线程,或者在循环或者在阻塞,被唤醒后调用tryAcquireShared()
,如果state
为0并获取成功后,根据Sync
复写的方法,它不会改state
,然后继续AQS
的逻辑一个一个的顺着队列唤醒下去,直到队列中所有的等待线程都执行完。
包装好的方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
使用示例
核心逻辑介绍
AQS
的基本操作
使用思路
注意一点即可,CountDownLatch.await()
可以在多个线程中同时调用哟,这样可以让很多依赖相同条件的任务都一起等待前期的预处理操作。
网友评论