Lock接口是jdk1.5以后加入的解决线程安全的除了synchronized关键字以外的另一个选择。相对于synchronized更加灵活。今天根据ReentrantLock的源码来分析一下他的实现原理。
ReentrantLock.png
上图简单画了一下ReentrantLock类图的关系。
下面写一段有线程安全问题的代码,用lock进行解决
public class LockDemo {
public int sum=0;
private Lock lock = new ReentrantLock();
public void add(){
lock.lock();
try {
sum+=1;
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
LockDemo lockDemo = new LockDemo();
for (int i = 0; i < 100000; i++) {
Thread t = new Thread(()->{
lockDemo.add();
});
t.start();
}
System.out.println(lockDemo.sum);
}
}
跟进lock()方法的实现
public void lock() {
sync.lock();
}
直接调用了sync成员变量的lock方法,看看sync的lock方法怎么实现的
有两种实现,公平和非公平,先看看默认的非公平
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
静态内部类NonfairSync继承自Sync,看看Sync是何方妖孽
abstract static class Sync extends AbstractQueuedSynchronizer
继承自AbstractQueuedSynchronizer 也就是大名鼎鼎的aqs。aqs是个同步器,是1.5以后各种同步工具的基本工具。
aqs中有个变量state,可以用来标示当前工具的一个状态。在lock中,他表示重入次数,如果没有线程占用锁,那么state就是0.如果有,就是大于等于1.在抽象类Sync中lock是抽象方法,需要子类去实现。先看看非公平锁的实现。
final void lock() {
//非公平锁线程进入方法以后直接尝试获取一次锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
线程进入方法以后直接尝试获取锁。通过compareAndSetState方法来设置state的值。看看compareAndSetState的实现,方法位于aqs类中
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这里调用了本地方法unsafe的compareAndSwapInt,采用cas的方式设置state。cas就是对比并且交换,三个参数stateOffset是当前锁的state的值锁存放的内存偏移量,expect是期望当前state的值,update是要设置的值。在这个方法执行时,拿传入的expect去和state比较,如果一致,就设置为update。如果不一致,就返回false。在lock方法执行时,compareAndSetState(0, 1),期望state在内存中为0,如果为0,说明当前锁未被其他线程占用,就设置为1.如果不是0,说明当前锁有被其他线程占用,返回false。
这里如果成功获取锁,会将当前线程设置为锁的持有者。如果不成功,进入 acquire(1);方法。
acquire的英文翻译过来是获取。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里分为2个步骤,tryAcquire(arg) 和acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。先看第一个步骤
tryAcquire方法在NonfairSync中实现。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//获取当前的state
if (c == 0) {//如果没有锁
if (compareAndSetState(0, acquires)) {//尝试获取锁
setExclusiveOwnerThread(current);
return true;
}
}
//如果是自己进入了,增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//未能获取锁或者啥也没干,返回false
return false;
}
如果tryAcquire方法返回了false,就会继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
先看addWaiter方法,由aqs实现
private Node addWaiter(Node mode) {
//新建一个节点,把当前线程包裹进去
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾巴有节点
if (pred != null) {
node.prev = pred;
//把当前线程节点设置为尾巴
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果走到这一步,说明尾巴没节点,或者设置尾巴节点失败了
enq(node);
return node;
}
进一步看看enq方法,enq,我理解为enter queue
private Node enq(final Node node) {
//一顿自旋,看来不完成一件事不罢休
for (;;) {
Node t = tail;
//如果尾巴节点没有,就初始化链表,进入第二次自旋
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else { //尾巴节点有东西了,有可能是本来就有,有可能是第一次自旋初始化的
node.prev = t;
//这一步如果失败,继续自旋,直到成功把当前节点设置为尾巴节点。
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
至此addWaiter方法就把竞争锁失败的线程包裹成一个node放入了队列中,看看下一步acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//又自旋 在这自旋是因为要挂起线程,当线程重新被唤醒的时候,还是在这继续执行代码
for (;;) {
//刚刚加入队列节点的前一个节点
final Node p = node.predecessor();
//如果前一个节点是头节点,说明没排队的了,尝试获取一次锁
if (p == head && tryAcquire(arg)) {
//获取锁后,设置当前节点为头节点
setHead(node);
//把原本的头节点移除链表
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果走到这,说明没获取到锁,尝试挂起当前线程
//should方法,判断一下当前节点应不应该挂起来,park
if (shouldParkAfterFailedAcquire(p, node) &&
//调用locksupport挂起当前线程,并且
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
非公平锁的lock方法就此结束了
看看公平锁的lock方法实现
final void lock() {
acquire(1);
}
区别就是直接acquire,进去看看tryAcquire的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果前面没有节点了,直接获取,否则返回false
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
后面的都一样。
网友评论