JUC中提供了几个比较常用的并发工具类,比如CountDownLatch、CyclicBarrier、Semaphore。
CountDownLatch
countdownlatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行,从命名可以解读到countdown是倒数的意思,类似于我们倒计时的概念。
countdownlatch提供了两个方法,一个是countDown,一个是await,countdownlatch初始化的时候需要传入一个整数,在这个整数倒数到0之前,调用了await方法的程序都必须要等待,然后通过countDown来倒数。
使用案例
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 定义一个倒计时,每个减一,0的时候才会让主线程释放
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(()->{
countDownLatch.countDown();
},"1").start();
new Thread(()->{
try {
Thread.sleep(20000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
},"2").start();
new Thread(()->{
countDownLatch.countDown();
try {
Thread.sleep(20000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"3").start();
// 阻塞主线程
countDownLatch.await();
System.out.println("执行完毕");
}
}
从代码的实现来看,有点类似join的功能,但是比join更加灵活,CountDownLatch构造函数会接受一个int类型的参数作为计数器的初始值,当调用CountDownLatch的countDown方法时,这个计数器就会减一。
通过await方法去阻塞主线程
示例图
使用场景
1、通过countdownlatch实现最大的并行请求,也就是可以让N个线程同时执行
2、比如应用程序启动之前,需要确保相应的服务已经启动,比如zookeeper通过原生api连接的地方就有用到countDownLatch
源码分析
CountDownLatch类存在一个内部类Sync,Sync是一个同步工具,继承了AbstractQueuedSynchronizer,很显然,CountDownLatch实际上是使得线程阻塞了,既然涉及到阻塞,就一定涉及到AQS队列。
await
await函数会使得当前线程在countdownlatch倒计时到0之前一直等待,除非线程中断,从源码可以得知await方法会转发到Sync的acquireSharedInterruptibly方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly
这块代码主要是判断当前线程是否获取到了共享锁,前面说过,AQS有两种锁类型,一种是共享锁,一种是独占锁,在这里用的是共享锁,为什么要用共享锁?因为CountDownLatch可以让多个线程同时通过。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判断线程是否是中断,如果是就抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获得共享锁,如果等于0则返回1,否则返回-1,返回-1表示需要阻塞
if (tryAcquireShared(arg) < 0)
// 要获得一个共享锁
doAcquireSharedInterruptibly(arg);
}
// 在这里,state的意义是count,如果计数器为0,表示不需要阻塞(计数器归零的时候才运行)
// ,否则,只有在满足条件的情况下才会被唤醒
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly
获取共享锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建一个共享模式的节点添加到AQS队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋等待共享锁释放,也就是等待计数器等于0。
for (;;) {
// 获得当前节点的上一个节点
final Node p = node.predecessor();
// 所以 p==head是true的
if (p == head) {
// 再次去尝试获得共享锁,这个r代表计数器
int r = tryAcquireShared(arg);
// r>=0表示计数器已经归零了,就不需要这个节点去阻塞了,则释放当前的共享锁
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前计数器还没有等于0的时候会调用如下两个方法
// shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
// 当前节点不是头节点,则尝试让当前线程阻塞,
// 第一个方法是判断是否需要阻塞,第二个方法是阻塞
// 如果主线程计数器还是小于0的情况下,就是当前的countDownLatch的计数器
// 还没有等于0的时候,会调用这两个方法
// shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
// 它首先会判断是不是要去挂起这个线程,什么情况需要挂起?
// 当你这个节点已经是SIGNAL的时候它就会挂起
// parkAndCheckInterrupt()这个LockSupport.park()就会让当前线程挂起,
// 会让线程变成WAITING状态, --> unpark(唤醒)/ 线程中断也会唤醒
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate(propagate传播)
状态图
PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点
private void setHeadAndPropagate(Node node, int propagate) {
// 记录头节点
Node h = head; // Record old head for check below
// 设置当前节点为头节点(当前main线程的节点)
setHead(node);
// 前面传过来的propagate是1,所以会进入下面的代码
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获得当前节点的下一个节点
Node s = node.next;
// 如果下一个节点是空,表示当前节点为最后一个节点,或这个下一个节点是share节点
if (s == null || s.isShared())
// 唤醒下一个共享节点
doReleaseShared();
}
}
// 把head指向当前main线程的节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
doReleaseShared
释放共享锁,通知后面的节点
private void doReleaseShared() {
// 又是一个自旋的操作
for (;;) {
// 获得头节点
Node h = head;
// 如果头节点不为空且不等于tail节点
if (h != null && h != tail) {
// 这个waitStatus的值 = -1,这是一个异步的过程
int ws = h.waitStatus;
// 如果头节点的状态为SIGNAL,-1 等于 signal,表示会在这里面去释放锁
if (ws == Node.SIGNAL) {
// 修改当前头节点的状态为0,避免下次再进入到这个里面
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// unparkSuccessor(h) 实际上就是唤醒当前节点的下一个节点的线程
//(如果是countdown调用,就是唤醒处于阻塞状态的main线程)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
countdown
以共享模式释放锁,并且会调用tryReleaseShared函数,根据判断条件也可能会调用doReleaseShared函数
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 如果为true,表示计数器已归零了
if (tryReleaseShared(arg)) {
// 唤醒处于阻塞的线程
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
这里主要是对state做原子递减,其实就是我们构造的CountDownLatch的计数器,如果等于0返回true,否则返回false
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
Semaphore
semaphore也就是我们常说的信号灯,semaphore可以控制同时访问的线程个数,通过acquire获取一个许可,如果没有就等待,通过release释放一个许可,有点类似限流的作用。叫信号灯的原因也和他的用处有关,比如某商场就5个停车位,每个停车位只能停一辆车,如果这个时候来了10辆车,必须要等前面有空的车位才能进入。
使用案例
public class SemaphoreDemo {
public static void main(String[] args) {
// 默认非公平
Semaphore semaphore = new Semaphore(5);
for (int i = 0 ; i < 10; i++) {
new DoAnything(i,semaphore).start();
}
}
static class DoAnything extends Thread {
private int num;
private Semaphore semaphore;
DoAnything(int num, Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
// (还有一种是guava的RateLimiter)
// 获取一个令牌,如果拿到令牌就会运行,如果拿不到就会阻塞,
// 可以达到一个限流的效果
// 限流主要体现在,你的接口本来的吞吐量有限,为了保护你的接口稳定性,
// 就会限制访问的请求量(这里只适合单机)
semaphore.acquire();
System.out.println("第" + num + "个线程进入");
Thread.sleep(2000);
// 释放令牌
semaphore.release();
//System.out.println("第" + num + "个线程释放令牌");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用场景
可以实现对某些接口访问的限流
源码分析
semaphore也是基于AQS来实现的,内部使用state表示许可数量,它的实现方式和CountDownLatch的差异点在于acquireSharedInterruptibly中的tryAcquireShared方法的实现,这个方法是在Semaphore方法中重写的
acquireSharedInterruptibly
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
tryAcquireShared
在semaphore中存在公平和非公平的方式,和重入锁是一样的,如果通过FairSync表示公平的信号量,NonFairSync表示非公平的信号量,公平和非公平取决于是否按照FIFO队列的顺序去分配Semaphore所维护的许可,现在看看非公平锁的实现
nonfairTryAcquireShared
自旋去获得一个许可,如果许可获取失败,也就是remaining < 0的情况下,让当前线程阻塞
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
releaseShared
releaseShared方法的逻辑也很简单,就是通过线程安全的方式去增加一个许可,如果增加成功,则触发释放一个共享锁,也就是让之前处于阻塞的线程重新运行
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
增加令牌数
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
原子操作
在多线程情况下,同时更新一个共享变量,由于之前说过原子性的问题,可能会得不到预期的结果,如果要达到期望的结果,可以通过synchronized来加锁解决,因为synchronized会保证多线程对共享变量的访问进行排队。
在Java5以后,提供了原子操作类,这些原子操作类提供了一种简单、高效以及线程安全的更新操作,而由于变量的类型很多,所以Atomic一共提供了12个类分别对应四种类型的原子更新操作,基本类型、数组类型、引用类型、属性类型
基本类型对应:AtomicBoolean、AtomicInteger、AtomicLong
数组类型对应:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
引用类型对应:AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference
字段类型对应:AtomicIntegerFieldUpdater、AtomicLongFieldUpdatere、AtomicStampedReference
Atomic原子操作的使用
public class AtomicIntegerTest {
private static AtomicInteger count = new AtomicInteger(0);
public static synchronized void inc() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.getAndIncrement();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
AtomicIntegerTest.inc();
}).start();
}
Thread.sleep(3000);
System.out.println(count.get());
}
}
AtomicInteger实现原理
由于所有的原子操作类都是大同小异,所以我们只分析其中一个原子操作类
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
这里又会发现一些熟悉的东西,就是unsafe,调用unsafe类中的getAndAddInt方法
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
// 方法获取对象中offset偏移地址对应的整型field的值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
通过循环以及cas的方式实现原子更新,从而达到多线程情况下仍然能够保证原子性的目的。
线程池
Java中的线程池是运行场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池,线程池就和数据库连接池类似,只是线程池是用来重复管理线程避免创建大量线程增加开销,所以合理使用线程池可以
1、降低创建线程和销毁线程的性能开销
2、合理的设置线程池大小可以避免因为线程数超出硬件资源瓶颈带来的问题,类似起到了限流的作用,线程是稀缺资源,如果无限创建,会造成系统稳定性问题
线程池的使用
JDK为我们内置了几种常见线程池的实现,均可以使用Executors工厂类创建,为了更好地地控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制,它们都在java.util.concurrent包中,是JDK并发包的核心。
其中有一个比较仲要的类:Executors,他扮演着线程工厂的角色,通过Executors可以创建特定功能的线程池
newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
newSingleThreadExecutor:创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若没有任务则不创建线程,并且每一个空闲线程会在60秒后自动回收。
newScheduledThreadPool:创建一个可以指定线程数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。
设置了3个固定线程大小的线程池来跑100个循环
public class ThreadPoolTest implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
static ExecutorService service = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
service.execute(new ThreadPoolTest());
}
service.shutdown();
}
}
submit和execute的区别
执行一个任务,可以使用submit和execute,这两者有什么区别?
1、execute只能接受Runnable类型的任务
2、submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null
ThreadPoolExecutor
前面说的四种线程池构建工具,都是基于ThreadPoolExecutor类,它的构造函数参数
public ThreadPoolExecutor( int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式
){
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
分别看一下前面提到的几个初始化工具类的构造以及原理
new FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中,另外keepAliveTime为0,也就是超出核心线程数量以外的线程空余存活时间,这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量Integer.MAX_VALUE,相当于没有上限,这个线程池执行任务的流程如下:
1、线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
2、线程数等于核心线程数后,将任务加入阻塞队列
3、由于队列容量非常大,可以一直添加
4、执行完任务的线程反复去队列中取任务执行
用途:FixedThreadPool用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量
newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
CachedThreadPool创建一个可以缓存的线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,若无可回收,则新建线程,并且没有核心线程,非核心线程数无上线,但是每个空闲的时间只有60秒,超过后就会被回收。
它的执行流程如下:
1、没有核心线程,直接向SynchronousQueue中提交任务
2、如果有空闲线程,就去取出任务执行,如果没有空闲线程,就新建一个
3、执行完任务的线程有60秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就被回收
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,LIFO,优先级)执行
线程池的源码分析
ThreadPoolExecutor是线程池的核心,提供了线程池的实现,ScheduledThreadExecutor继承了ThreadPoolExecutor,并另外提供一些调度方法以支持定时和周期任务,Executors是工具类,主要用来创建线程池对象,我们把一个任务提交给线程池处理的时候,线程池的处理过程是什么样的?
线程数量和线程池状态管理
线程池用一个AtomicInteger来保存 [线程数量] 和 [线程池状态],一个int数值一共有32位,高三位用于保存运行状态,低29位用于保存线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 32 -3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 将1的二进制向右位移29位,再减1表示最大线程容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行状态保存在int值的高3位(所有数值左移29位)
// 接收新任务,并执行队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不接收新任务,但是执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受任务,不执行队列中的任务,中断正在执行中的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有的任务都已结束,线程数量为0,处于该状态的线程池即将调用terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法执行完成
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
execute
通过线程池的核心方法了解线程池中这些参数的含义
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1、当线程池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2、核心池已满,但任务队列未满,添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了
if (! isRunning(recheck) && remove(command))
// 如果线程处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务
reject(command);
// 如果之前的线程已被销毁完,新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3、核心池已满,队列已满,尝试创建一个新线程
else if (!addWorker(command, false))
// 如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
reject(command);
}
流程图
流程图
设置线程池的线程数的方式:
CPU密集型(线程数尽可能小,因为CPU的利用率很高,如果线程多了上下文切换就比较频繁,这样性能反而很低)
IO密集型(可以设置更多的线程,因为我们更多的线程可以处理更多的任务,因为IO阻塞的话我们的CPU时间片会被隔离出来,这样就会给其他的任务去使用,这样就可以构造更多的任务去处理,这里就可以按照CPU核心数的倍数去设置)
混合型
任务的执行时间,如果长?如果短?怎么选?
队列的选择和队列的大小,不是说越大越好,如果队列很大,那所有任务都加在队列里面,那这样内存占用就非常大,因为每一个任务都是放在阻塞队列的空间里面,这样可能会把内存撑爆。
使用线程池的一个例子
public class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName =
CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
public class CustomThreadPoolExecutor {
/**
* 设置为私有构造函数
*/
private CustomThreadPoolExecutor() {
}
/**
* 使用volatile关键字保证可见性和防止指令重排
* 指令重排的意思:比如java中的简单一句instance = new Singleton,
* 会被编译器编译成如下JVM指令:
* memory = allocate(); // 1: 分配对象的内存空间
* ctorInstance(memory); // 2: 初始化对象
* instance = memory; 3: 设置instance指向刚分配的内存地址
* 但是这些指令顺序并非一成不变,有可能会经过JVM和CPU的优化,
* 指令重排成下面的顺序:
* memory = allocate(); // 1: 分配对象的内存空间
* instance = memory; 3: 设置instance指向刚分配的内存地址
* ctorInstance(memory); // 2: 初始化对象
* 当线程A执行完1,3的时候,instance对象还没完成初始化,但已经不再指向null。
* 此时如果线程B抢占到CPU资源,执行if(instance == null)的结果会返回false,
* 从而会返回一个没有初始化完成的instance对象。所以这里要用到volatile关键字
*/
private static volatile ExecutorService executorService = null;
/**
* 静态工厂方法
*/
private static ExecutorService getPool() {
// 双重检查机制
if (null == executorService) {
// 同步锁,注意这里不能使用对象锁
synchronized (CustomThreadPoolExecutor.class) {
// 双重检测机制
// 进入synchronized临界区以后,还要再做一次判空。
// 因为当两个线程同时访问的时候,线程A构建完对象,
// 线程B也已经通过了最初的判空验证,不做二次判空的话,
// 线程B还是会再次构建executorService对象。
if (executorService == null) {
executorService = new ThreadPoolExecutor(5, 50, 10L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new CustomThreadFactory());
}
}
}
return executorService;
}
public static void submit(Runnable runnable) {
getPool().submit(runnable);
}
public static Future<Object> submit(Callable<Object> callable) {
return getPool().submit(callable);
}
}
网友评论