java1.5以前
synchronized <---> 同步
wait / notify <---> 协同
1.5引入
lock <---> 同步(确保共享变量不会被错误访问)
condition <---> 协同(多个线程能以正确的顺序和正确的方式进行交互)
引入原因
condition文档中
{@code Condition} factors out the {@code Object} monitor methods ({@link Object#wait() wait}, {@link Object#notify notify} and {@link Object#notifyAll notifyAll}) into distinct objects to give the effect of having multiple wait-sets per object,by combining them with the use of arbitrary {@link Lock} implementations.
把object对象中自带的wait/notify/notifyAll拆成单独的condition对象;
- Lock/Condition与synchronize/wait/notify机制的差别;
- synchronize不够灵活,方法必须在同一个类中,无法拆解;
- 一个对象只能有一个等待集合;
- 无法实现读写分离;(因为synchronize是一个非常重的悲观锁);
- 无法提供测试方法,是否上锁,而Lock中有tryLock();
- 优先级/公平性;(synchronize的调度完全取决于操作系统)
公平锁/非公平锁
具体实现
java.util.concurrent.locks.ReentrantLock.NonfairSync
java.util.concurrent.locks.ReentrantLock.FairSync
- 重入和不可重入
重入锁: 如果已经持有锁,可以无穷次的进入和退出;
Lock lock = new ReentrantLock();
void fooWithLock() {
try {
lock.lock();
lock.lock();
} finally {
lock.unlock();
lock.unlock();
}
}
synchronize是可重入的;并且由虚拟机保证,无论以什么方式退出,都会解锁;
/**
* 生产者消费者
*/
public class ProducerConsumer {
private static final Integer MAX_QUEUE_SIZE = 100;
private static Lock lock = new ReentrantLock();
private static Condition queueEmpty = lock.newCondition();
private static Condition queueFull = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
Queue<Integer> queue = new LinkedList();
Producer producer = new Producer(queue);
producer.start();
Consumer consumer = new Consumer(queue);
consumer.start();
Producer producer2 = new Producer(queue);
producer2.start();
Consumer consumer2 = new Consumer(queue);
consumer2.start();
producer.join();
consumer.join();
producer2.join();
consumer2.join();
}
public static class Producer extends Thread {
Queue<Integer> queue;
@Override
public void run() {
lock.lock(); // 等价于synchronized(Condition)
try {
while (queue.size() >= MAX_QUEUE_SIZE) {
queueEmpty.await(); //等价于Object的wait,但由于wait()为final,所以重写await()
}
queue.add(new Random().nextInt());
queueFull.signalAll(); // 等价于Object的notify,也为final
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public Producer(Queue<Integer> queue) {
this.queue = queue;
}
}
public static class Consumer extends Thread {
Queue<Integer> queue;
@Override
public void run() {
lock.lock(); // 等价于synchronized(Condition)
try {
while (queue.size() <= 0) {
queueFull.await(); //等价于Object的wait,但由于wait()为final,所以重写await()
}
Integer removeNum = queue.remove();
System.out.println("取出" + removeNum);
queueEmpty.signalAll(); // 等价于Object的notify,也为final
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public Consumer(Queue<Integer> queue) {
this.queue = queue;
}
}
}
/**
* 线程协同机制
*/
public class ThreadCoordination {
public static void main(String[] args) {
ConcurrentHashMap<Integer, Integer> results = new ConcurrentHashMap<>();
Lock lock = new ReentrantLock();
Condition allThreadFinished = lock.newCondition();
AtomicInteger howManyThreadRunning = new AtomicInteger(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = new Random().nextInt();
results.put(value, value);
lock.lock();
try {
howManyThreadRunning.decrementAndGet();
allThreadFinished.signal();
} finally {
lock.unlock();
}
}).start();
}
lock.lock();
try {
while (howManyThreadRunning.get() > 0) {
allThreadFinished.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
System.out.println(results);
}
}
网友评论