前言
首先说说LockSupport吧,它的作用是提供一组直接block或unblock线程的方法,其底层实现利用了Unsafe(前面文章有讲过Unsafe)。LockSupport是一个非常底层的API,我们利用其可以做很多事情,本文将利用LockSupport实现互斥锁和共享锁。
Lock
在JDK中已经提供了很多种锁的实现,原生的synchronized(优先推荐使用),juc中的ReentrantLock等,本文不纠结synchronized和ReentrantLock的实现,本文只从Lock的语义出发实现两种锁。
Lock的语义
juc中对于Lock接口的定义如下:
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
void unlock();
Condition newCondition();
- void lock():获取锁的语义,如果没有获取到锁一直阻塞当前线程(不响应中断interrupt)
- void lockInterruptibly() throws InterruptedException; 获取锁,但是当前线程在阻塞期间可以响应中断(后面稍微会扯一下InterruptedException)
- boolean tryLock(); 尝试获取锁,不阻塞;获取到锁返回true,没有获取到返回false
- boolean tryLock(long var1, TimeUnit var3) throws InterruptedException; 尝试获取锁,并尝试阻塞等待一定时间,阻塞期间可以响应中断
- void unlock(); 释放锁;
- Condition newCondition();在锁上新建Condition
以上的关于锁的语义稍微复杂了点,特别是相应中断部分和newCondition部分,所以这次实现上简化了Lock的语义如下:
void lock();
void unLock();
boolean tryLock();
boolean tryLock(long maxWaitInMills);
基本功能和上面保持一致,但是都不响应中断
分析锁的实现
- Lock有可重入的语义,一个线程拥有锁之后再次调用lock应该完全没有任何问题,所以锁的实现中需要维护一个已经获取锁的线程队列;
- Lock未成功需要阻塞当前线程,所以需要底层阻塞原语(LockSupport)等的支持,并且在有线程释放锁之后需要唤起阻塞线程进行锁的竞争,所以需要维护等待锁的线程队列
- Lock需要维护当前锁的状态(是否可以被获取等)
互斥锁
public class MutexLock implements Lock {
private volatile Thread threadOwnsTheLock;
private final AtomicInteger state = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
//一直等待
public void lock() {
tryLock(-1L);
}
//invoke all的语义,也可以做invokeNext
public void unLock() {
tryRelease(-1);
threadOwnsTheLock = null;
if (!waitThreadsQueue.isEmpty()) {
for (Thread thread : waitThreadsQueue) {
LockSupport.unpark(thread);
}
}
}
public boolean tryLock() {
if (threadOwnsTheLock != null && (threadOwnsTheLock == Thread.currentThread())) {
return true;
}
if (tryAcquire(1)) {
threadOwnsTheLock = Thread.currentThread();
return true;
}
return false;
}
//没有实现interrupt的语义,不能打断
public boolean tryLock(long maxWaitInMills) {
Thread currentThread = Thread.currentThread();
try {
waitThreadsQueue.add(currentThread);
if (maxWaitInMills > 0) {
boolean acquired = false;
long left = maxWaitInMills * 1000L * 1000L;
long cost = 0;
while (true) {
//需要判断一次interrupt
if (tryAcquire(1)) {
threadOwnsTheLock = currentThread;
acquired = true;
break;
}
left = left - cost;
long mark = System.nanoTime();
if (left <= 0) {
break;
}
LockSupport.parkNanos(left);
cost = mark - System.nanoTime();
}
return acquired;
}else {
while (true) {
if (tryAcquire(1)) {
threadOwnsTheLock = currentThread;
break;
}
LockSupport.park();
}
return true;
}
} finally {
waitThreadsQueue.remove(currentThread);
}
}
protected boolean tryAcquire(int acquire) {
return state.compareAndSet(0, 1);
}
protected void tryRelease(int release) {
if (threadOwnsTheLock == null || (threadOwnsTheLock != Thread.currentThread())) {
System.out.println("Wrong state, this thread don't own this lock.");
}
while (true) {
if (state.compareAndSet(1, 0)) {
return;
}
}
}
}
以上互斥锁使用了一个AtomicInteger,利用了CAS来维持锁的状态
共享锁
public class ShareLock implements Lock {
private volatile Set<Thread> threadsOwnsLock = Sets.newConcurrentHashSet();
private final AtomicInteger state;
private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
public ShareLock(int shareNum) {
this.state = new AtomicInteger(shareNum);
}
//一直等待
public void lock() {
tryLock(-1L);
}
public void unLock() {
tryRelease(-1);
threadsOwnsLock.remove(Thread.currentThread());
if (!waitThreadsQueue.isEmpty()) {
for (Thread thread : waitThreadsQueue) {
LockSupport.unpark(thread);
}
}
}
public boolean tryLock() {
if ( !(threadsOwnsLock.contains(Thread.currentThread()))) {
return true;
}
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
return true;
}
return false;
}
public boolean tryLock(long maxWaitInMills) {
Thread currentThread = Thread.currentThread();
try {
waitThreadsQueue.add(currentThread);
if (maxWaitInMills > 0) {
boolean acquired = false;
long left = TimeUnit.MILLISECONDS.toNanos(maxWaitInMills);
long cost = 0;
while (true) {
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
acquired = true;
break;
}
left = left - cost;
long mark = System.nanoTime();
if (left <= 0) {
break;
}
LockSupport.parkNanos(left);
cost = mark - System.nanoTime(); //有可能是被唤醒重新去获取锁,没获取到还得继续等待剩下的时间(并不精确)
}
return acquired;
}else {
while (true) {
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
break;
}
LockSupport.park();
}
return true;
}
} finally {
waitThreadsQueue.remove(currentThread);
}
}
protected boolean tryAcquire(int acquire) {
if (state.getAndDecrement() > 0) {
return true;
} else {
state.getAndIncrement();//恢复回来
return false;
}
}
protected void tryRelease(int release) {
if (!(threadsOwnsLock.contains(Thread.currentThread()))) {
System.out.println("Wrong state, this thread don't own this lock.");
}
state.getAndIncrement();
}
}
总结
以上利用了LockSupport来实现了互斥锁和共享锁,但是实现中并没有完成中断响应。后面应该会有文章单独说明关于InterruptedException的注意点。下篇文章将讲述如何利用LockSupport实现Future语义
网友评论