LockSupport
LockSupport类是Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。
public class LockSupport {
public static void park() {
UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
}
每个线程都会有一个独有的permit(许可)。
- unpark方法:使得所给的线程的permit置为ok。如果所给的线程还没开始,则这个方法并不保证有效。
- park方法:如果当前线程的permit是ok的,则继续执行,否则阻塞等待,直到以下任意一件事发生为止:
- 其他线程调用unpark方法,传入的thread是当前线程。
- 其他线程interrupt当前线程
- 因为The call spuriously 而导致无原因返回
- 注意:park函数并不会提供report说明是因为上述的哪种原因返回,需要Caller re-check而得知是因为哪种原因返回的。(即如果是2,则park方法也不会抛出InterruptedException异常)
- 我们看到park和unpark调用的都是UNSAFE的park和unpark,那UNSAFE下的这些函数怎么实现的呢?其实UNSAFE包下的函数都是native的,即这些方法都是C/C++实现的,并且被编译成了DLL,由Java调用。
Java中的native关键字
- 使用native关键字说明这个方法是原生函数,也就是这个方法是用C/C++语言实现的,并且被编译成了DLL,由java去调用。 这些函数的实现体在DLL中,JDK的源代码中并不包含,你应该是看不到的。对于不同的平台它们也是不同的。这也是java的底层机制,实际上java就是在不同的平台上调用不同的native方法实现对操作系统的访问的。
- native的意思就是通知操作系统, 这个函数你必须给我实现,因为我要使用。 所以native关键字的函数都是操作系统实现的, java只能调用。
- java是跨平台的语言,既然是跨了平台,所付出的代价就是牺牲一些对底层的控制,而java要实现对底层的控制,就需要一些其他语言的帮助,这个就是native的作用了
为何引入?
相比较于wait/notify/notifyAll有何优点?
- 我们必须先让wait方法运行,然后再调用notify/notifyAll方法来唤醒。
- 而LockSupport不是,park和unpark的方法执行无需先后。
即线程A可以先执行unpark,然后线程B执行park的时候就不会阻塞。
注意:LockSupport是不可重入的:unpark三次之后,park一次可以继续运行,再次park还是会被阻塞。可以理解为unpark是把某个标志位标为1,并不是加1。park是将这个标志位标为0,而非减1。
总结
- LockSupport是Java6引入的一种同步原语,和wait/notify/notifyAll类似
- LockSupport不限制park和unpark的调用顺序
- LockSupport是不可重入的
- LockSupport主要用于其他同步器的实现,比如AQS。
AQS
介绍
AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),它是JUC并发包中的核心基础组件。
AQS简单地说就是使用一个FIFO的等待队列和一个volatile int state来实现同步的。即通过CAS state判断是否被锁(CAS来保证原子性、volatile保证可见性),将阻塞的线程打包放入等待队列中。
1. AQS的使用者一般定义一个内部类来继承AQS,使用组合的方式使用。
2. AQS有两种模式:排他和共享。
排他模式:只有一个线程可以拥有锁。(排他锁)
共享模式:可以同时多个线程拥有锁。(读锁)
AQS中两种模式下的waiting thread共用一个queue,所以一般使用者都只是使用一种模式。ReentrantReadWriteLock是同时使用了两种模式。
定位
使用者继承AQS,实现AQS中的几个未实现的方法。然后就可以调用AQS的方法来实现自己的接口功能了。
- 举个例子:ReentrantLock
//简略代码,仅用来说明调用关系
class ReentrantLock{
private final Sync sync;
static class Sync extends AbstractQueuedSynchronizer{
protected final boolean tryRelease(int releases){/*code*/}
protected final boolean isHeldExclusively(){/*code*/}
protected final boolean tryAcquire(int acquires){/*code*/}
}
public void lock(){
sync.acquire();//acquire方法是AQS类中实现好的函数
}
public void unlock() {
sync.release(1);//release方法是AQS类中实现好的函数
}
}
我们可以看到ReentrantLock使用一个内部类Sync来继承AQS,然后实现排他锁的三个方法。
我们知道ReentrantLock有lock和unlock接口,可以看到这两个接口的实现就是调用AQS原有的方法。
- 问题:为何必须要实现三个方法?
因为AQS中这三个方法是空的,必须由使用者来定义。这么做的目的是:使用者可以自己定义state的代表含义(比如=0代表无锁,>0代表有锁),通过自定义的函数可以实现诸如是否重入、是否公平等功能。 - 问题:我们实现了这三个方法,为何使用时却调用的是AQS原有的方法?
因为AQS原有的方法(比如acquire、release)中调用了上述的三个方法,AQS已经帮我们实现好了逻辑。
- AQS未实现的方法
//非堵塞获取独占资源,true表示成功
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//非堵塞释放独占资源,true表示成功
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//在排它模式下,状态是否被占用
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
//非堵塞获取共享资源,负数表示失败,0表示成功但不需要向后传播,大于0表示成功且可以向后传播
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//非堵塞释放共享资源,true表示成功
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
前三个是排他锁所要实现的,后两个是共享锁所要实现了。注意:这五个函数并不是abstract,原因是因为一般都是使用某一种模式(排他或共享模式),所以子类只需使用其中一组就可以了。
AQS的类结构
private transient volatile Node head;//队列头结点
private transient volatile Node tail;//队列尾结点
private volatile int state;//同步状态
static final long spinForTimeoutThreshold = 1000L;//自旋最大时间
- head和tail都是lazy init,即当有第一个结点入队列的时候采取初始化。且head指向的是一个空结点,head->next指向的才是第一个结点。
- Node是一个等待线程的信息封装。AQS将阻塞等待的线程封装为一个内部类Node,形成队列。
- AQS内部线程的挂起和唤醒使用的是LockSupport的pack/unpack,所以AQS内部会有对pack之后的唤醒操作的check。
- 对state和队列的操作都是通过CAS来实现的。stateCAS失败意思就是获取锁失败,下面的步骤就是等待加入队列了。而入队列和出队列的操作,如果CAS失败,AQS的设计是通过自旋来一直尝试,直到达到spinForTimeoutThreshold。官方上说这种队列叫做CLH队列。
AQS中加锁和解锁的方法
在使用AQS的类中用来加锁和解锁的方法。
- 排他模式
//获取锁,如果获取失败则加入队列
public final void acquire(int arg);
//获取锁,如果获取失败则加入队列;支持中断取消等待的线程
public final void acquireInterruptibly(int arg) throws InterruptedException;
//获取锁,如果获取失败则加入队列;带有超时时间的
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
//释放锁
public final boolean release(int arg);
这里我们可以看到“获取锁,如果失败则加入队列”这个行为是由AQS来实现的。而如何判断失败?这个是由子类来决定的。这个决定支持了可重入性、是否公平性等功能。
- 共享模式
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg) throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
public final boolean releaseShared(int arg);
共享模式下的对应的四个方法。
AQS公平性
我们知道公平锁:先来的一定先获取锁。
非公平锁:当多个线程在争取锁,谁先获取锁的顺序是不固定的。
AQS的公平性是由使用者来决定的。
我们知道AQS中的acquire函数是大致这样实现的。
* Acquire:
* while (!tryAcquire(arg)) {
* <em>enqueue thread if it is not already queued</em>;
* <em>possibly block current thread</em>;
* }
因为每次acquire的步骤是:先try再入队列。所以就可以出现这种情况:队列中有两个线程在等待,当锁被释放时,刚好又来了一个线程,则try的时候成功了,这样这个线程就获得锁了。
如果想要实现公平锁:tryAcquire的时候判断一下,如果有线程在等待,这个函数直接返回false。
显然非公平锁要比公平锁的效果要高。
AQS中的Condition
- wait/notify/notifyAll以及Condition.await/signal/signalAll区别
我们知道线程间的通信我们有两种方法:wait/notify/notifyAll以及Condition.await/signal/signalAll
两者的比较(Condition的优点):- Condition的await方法是可以有超时参数的且可以被中断,而wait方法显然不行。
- 多个Condition可以和一个Lock绑定。
注意:await/signal/signalAll必须在lock和unlock代码块内使用。
- Condition的实现者(唯一实现者)--AQS
而Condition的实现者(唯一实现者)就是AQS中的ConditionObject。而ConditionObject的实现也是一个等待队列来完成。await就是往队列中加入线程Node且挂起线程,signal就是从队列中唤醒一个线程。
注意:Condition只在支持排他模式下起作用。 - Object.wait/Condition.await/LockSupport.park三者的关系
Object.wait/Condition.await/LockSupport.park三者的关系是:
Condition.await底层是LockSupport.park来实现的。
Object.wait底层是通过关联Monitor来实现的。
AQS中的几个使用者
AQS的使用者们- ReentrantLock使用了排他模式
- ReentrantReadWriteLock使用了两种模式。
- Semaphore使用了共享模式:
初始化一定数量的permit,acquire会消耗一个,release会释放一个。一般用于控制进入一块代码的最大线程数。
举个例子:
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
}
- CountDownLatch使用了共享模式:
初始化一个数量N,线程调用await方法会被阻塞,线程调用countDown方法会将N-1,当N被减为0时,所有线程同时被唤醒。有点像C++中的栅栏一样。
常见场景:一个线程等待N个线程完成某件事件(join方法类似),或N个线程等待一个线程的某个动作(像是一个线程开枪,其他线程同时跑)。
注意的是CountDownLatch的N不可被reset,即N减到0之后,无法再被设为N。想要使用必须重新new一个对象。 - 与之对应的是CyclicBarrier
初始化一个N,当一个线程执行到await时,会被阻塞。当N个线程都达到await方法时,N个线程同时被唤醒执行。
这里的Cycle的意思是当N个线程同时达到await被唤醒后,这个CyclicBarrier对象可以继续用。
举个例子:多个write同时都写完之后,再同时do other thing。
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++)
new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
//write some data
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
// do other thing
}
}
}
网友评论