部分源码分析
内部类部分
Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 更新state值
int c = getState() - releases;
// 如果该线程不是目前独占锁的拥有者,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果全部锁释放,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 判断是否持有(独占)锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取加锁次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判断是否加锁
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
* 加锁操作,先尝试插个队请求锁。插队失败,则正常排队获取。
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 排队获取锁
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
* 正常是递归调用,或没有等待者,或者来到了队列的head,否则不授予权限
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重复加锁(对应的,要释放对应数量的锁),对应可重入锁
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
ReentrantLock类本类
构造函数
/**
* 无参构造函数
* 默认sync为不公平
* 同 ReentrantLock(false);
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 参数fair决定fair/nonfair
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock()
获取锁的方式看sync为fair还是nonfair
NonfairSync中调用lock()时,线程会插队先去竞争锁,竞争失败后才会进入阻塞队列。而FairSync中,线程调用lock()时,直接进入阻塞队列等待。
/**
* 获取锁,获取锁的方式看sync为fair还是nonfair
* <p> 如果锁未被占有或当前线程已经占有锁,则count+1
* <p> 如果锁已经被占有,且占有者不是当前线程则当前线程进入休眠状态,直到占有锁。
*/
public void lock() {
sync.lock();
}
-
FairSync
final void lock() { acquire(1); //acquire()函数的内容见AbstractQueuedSynchronizer源码。 }
-
NonfairSync
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//acquire()函数的内容见AbstractQueuedSynchronizer源码。同上 }
lockInterInterruptibly()
/**
* 以独占模式获取,打断后终止。
* 首先检查中断状态,然后至少调用一次{@link #tryAcquire},成功则返回success。 否则进入queue,通过
* CAS自旋获取锁(见AbstractQueuedSynchronizer#acquireQueued),直到成功或者被打断
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
场景应用
如何而配合Condition
// TODO
中断响应
import java.util.concurrent.locks.ReentrantLock;
public class InterruptThread implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public InterruptThread(int lock) {
this.lock = lock;
}
@Override
public void run() {
try {
if (lock == 1) {
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
lock2.lockInterruptibly();
} else if (lock == 2) {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
lock1.lockInterruptibly();
}
if (lock1.isHeldByCurrentThread())
lock1.unlock();
if (lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId() + "退出");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
InterruptThread r1 = new InterruptThread(1);
InterruptThread r2 = new InterruptThread(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
t1.interrupt();
}
}
实现阻塞队列
- BoundedQueue.java
package com.yrls.config;
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue {
private Integer[] items;//定义为数组,在创建对象时就确定容量
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
private int count;
private int addIndex,removeIndex;
public BoundedQueue(int size){
items = new Integer[size];
}
public void add(Integer object) throws InterruptedException{
lock.lock();
System.out.println(Thread.currentThread().getName() + "lock ");
try{
if(Thread.currentThread().getName().equals("1") || Thread.currentThread().getName().equals("10") ){
for(int i = 0;i<10;++i){
System.out.println(i);
}
}
while(count==items.length){
notFull.await();
}
items[addIndex] = object;
if(++addIndex==items.length){
addIndex = 0;
}
count++;
System.out.println(Thread.currentThread()+" 插入一个元素,数组为:"+Arrays.toString(items));
notEmpty.signal();
}finally{
System.out.println(Thread.currentThread().getName() + " unlock");
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public Integer remove() throws InterruptedException{
lock.lock();
System.out.println(Thread.currentThread().getName() + " remove-lock");
try{
if(Thread.currentThread().getName().equals("1")){
for(int i = 0;i<10;++i){
System.out.println(i);
}
}
while(count==0){
System.out.println("----start wait----");
notEmpty.await();
}
Integer temp = items[removeIndex];
items[removeIndex] = null;
System.out.println(Thread.currentThread()+" 移除一个元素,数组为:"+Arrays.toString(items));
if(++removeIndex==items.length){
removeIndex=0;
}
count--;
notFull.signal();
return temp;
}finally{
System.out.println(Thread.currentThread().getName() + " remove-unlock");
lock.unlock();
}
}
}
- Test.java
public class New1 {
private static final Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) throws InterruptedException {
BoundedQueue queue = new BoundedQueue(5);
for(int i=1;i<=6;i++){
Thread thread = new Thread(new Producter(queue),String.valueOf(i));
thread.start();
}
for(int i=1;i<=12;i++){
Thread thread = new Thread(new Consumer(queue),String.valueOf(i));
thread.start();
}
}
static class Producter implements Runnable{
private BoundedQueue queue;
public Producter(BoundedQueue queue){
this.queue = queue;
}
public void produce() throws InterruptedException{
queue.add(new Integer(random.nextInt(100)));
}
@Override
public void run() {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable{
private BoundedQueue queue;
public Consumer(BoundedQueue queue){
this.queue = queue;
}
public Integer remove() throws InterruptedException{
return queue.remove();
}
@Override
public void run() {
try {
remove();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 结果
为了便于了解对ReentrantLock加锁过程中其它线程的状态,打印内容增加了一些无聊的输出,便于理解。
- 可以看到第一个线程占有reentrantlock时,其它线程无法对去竞争reentrantlock。(在thread1中打印数字的这段时间,未被打断)
- 可重入锁是针对同线程而言,不同线程不适用。
- 可以看到输出的最后部分有线程6\7\9\11\12(不一定是这几个)的加锁但没有解锁,线程6获取锁后,由于资源耗尽,触发condition.await(),6将占有锁并等待signal()。在6释放锁之前,接下来的线程的lock()都将竞争失败,进入队列。
1lock
0
1
2
3
4
5
6
7
8
9
Thread[1,5,main] 插入一个元素,数组为:[49, null, null, null, null]
1 unlock
2lock
Thread[2,5,main] 插入一个元素,数组为:[49, 22, null, null, null]
2 unlock
3lock
Thread[3,5,main] 插入一个元素,数组为:[49, 22, 60, null, null]
3 unlock
4lock
Thread[4,5,main] 插入一个元素,数组为:[49, 22, 60, 70, null]
4 unlock
3 remove-lock
Thread[3,5,main] 移除一个元素,数组为:[null, 22, 60, 70, null]
3 remove-unlock
5lock
Thread[5,5,main] 插入一个元素,数组为:[null, 22, 60, 70, 66]
5 unlock
8 remove-lock
Thread[8,5,main] 移除一个元素,数组为:[null, null, 60, 70, 66]
8 remove-unlock
6lock
Thread[6,5,main] 插入一个元素,数组为:[60, null, 60, 70, 66]
6 unlock
1 remove-lock
Thread[1,5,main] 移除一个元素,数组为:[60, null, null, 70, 66]
1 remove-unlock
2 remove-lock
Thread[2,5,main] 移除一个元素,数组为:[60, null, null, null, 66]
2 remove-unlock
4 remove-lock
Thread[4,5,main] 移除一个元素,数组为:[60, null, null, null, null]
4 remove-unlock
5 remove-lock
Thread[5,5,main] 移除一个元素,数组为:[null, null, null, null, null]
5 remove-unlock
6 remove-lock
----start wait----
7 remove-lock
9 remove-lock
10 remove-lock
11 remove-lock
12 remove-lock
网友评论