在java 1.5中新增加了ReentrantLock类,所谓ReentrantLock的再入特性,是指当一个线程试图获取一个它已经获取的锁时,这个获取动作就自动成功,这是对锁获取粒度的一个概念,也就是锁的持有是以线程为单位而不是基于调用次数.Java 锁实现强调再入性是为了和 pthread 的行为进行区分。
ReentrantLock的语义和synchronized基本相同,与synchronized相比,ReentrantLock使用上更加灵活。
- 可通过Condition类实现多路通知功能
- 提供多个便利的方法,如判断是否有线程在排队等待锁
- 可设置所为公平锁或非公平锁
- 可以响应中断请求
- 带超时的获取锁尝试
案例1.使用condition实现线程按顺序执行
public class LockTask {
private Lock lock = new ReentrantLock();
private Condition aCondition = lock.newCondition();
private Condition bCondition = lock.newCondition();
private Condition cCondition = lock.newCondition();
private static int controlNum = 0;
public void printA(){
lock.lock();
try{
while(controlNum != 0){
aCondition.await();
}
System.out.print("A");
Thread.sleep(1000);
controlNum = (++controlNum) % 3;
bCondition.signalAll();
}catch (Exception e){
System.out.print(e.getMessage());
}finally{
lock.unlock();
}
}
public void printB(){
lock.lock();
try{
while(controlNum != 1){
bCondition.await();
}
System.out.print("B");
Thread.sleep(1000);
controlNum = (++controlNum) % 3;
cCondition.signalAll();
}catch (Exception e){
System.out.println(e.getMessage());
}finally{
lock.unlock();
}
}
public void printC(){
lock.lock();
try{
while(controlNum != 2){
cCondition.await();
}
System.out.print("C");
Thread.sleep(1000);
controlNum = (++controlNum) % 3;
aCondition.signalAll();
}catch (Exception e){
System.out.println(e.getMessage());
}finally{
lock.unlock();
}
}
}
public class ThreadA extends Thread {
private LockTask lockTask;
public ThreadA(LockTask lockTask){
this.lockTask = lockTask;
}
@Override
public void run() {
for(int i = 0; i < 10; i++) {
lockTask.printA();
}
}
}
public class ThreadB extends Thread {
private LockTask lockTask;
public ThreadB(LockTask lockTask){
this.lockTask = lockTask;
}
@Override
public void run() {
for(int i = 0; i < 10; i++) {
lockTask.printB();
}
}
}
public class ThreadC extends Thread {
private LockTask lockTask;
public ThreadC(LockTask lockTask){
this.lockTask = lockTask;
}
@Override
public void run() {
for(int i = 0; i < 10; i++) {
lockTask.printC();
}
}
}
public class LockMain {
public static void main(String[] args) {
LockTask lockTask = new LockTask();
ThreadA a1 = new ThreadA(lockTask);
ThreadA a2 = new ThreadA(lockTask);
ThreadB b1 = new ThreadB(lockTask);
ThreadB b2 = new ThreadB(lockTask);
ThreadC c1 = new ThreadC(lockTask);
ThreadC c2 = new ThreadC(lockTask);
a1.start();
a2.start();
b1.start();
b2.start();
c1.start();
c2.start();
}
}
线程按顺序调用
案例2 BlockingQueue中的ReentrantLock+Condition。
ArrayBlockingQueue,LinkedBlockingQueue等阻塞队列通过ReentrantLock+Condition实现向队列插入数据和从队列中取数据的语义,下面详细讲解:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
- lockInterruptibly()的作用是如果线程未被中断则获取锁定,如果线程已被中断,则抛出异常。
1.当队列为空时,试图take的线程的正确行为应该是等待入队发生,而不是直接返回,通过notEmpty.await就可以实现。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
在enquene操作中可以通过notEmpty.signal()唤醒等待take的线程
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
2.当队列为满时,试图put的线程的正确行为应该是等待出队发生,而不是直接返回,通过notFull.await就可以实现。
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
在dequene操作中可以通过notFull.signal()唤醒等待put的线程
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
网友评论