在研究 synchronized
和 ReentrantLock
时产生一个疑问, synchronized
通过 monitor 机制保证线程间可见性,但是 ReentrantLock
是如何保证的呢。
ReentrantLock 如何保证线程间可见性
ReentrantLock 基于 AQS 实现,AQS 的核心之一就是 state 管理。
state 被 volatile 修饰保证此变量在线程间可见,但是在查阅相关框架和库时发现某些作者竟然可以通过 ReentrantLock 实现和 synchronized 一致的线程可见性。
原来是我理解错了,volatile 并不仅仅保证被修饰的变量在线程间可见性,而是保证在写入 volatile 变量后,其他线程读取此 volatile 变量后的所有操作都保证可见性。
此特性正是由于 Happens-before
原则,ReentrantLock
受此原则约束。
在 java.util.concurrent.locks.Lock 接口的Javadoc中有这样一段话:
All Lock implementations must enforce the same memory synchronization semantics as provided by the built-in monitor lock :
- A successful lock operation acts like a successful monitorEnter action
- A successful unlock operation acts like a successful monitorExit action
Unsuccessful locking and unlocking operations, and reentrant locking/unlocking operations, do not require any memory synchronization effects.
这段话的核心是j.u.c.locks.Lock接口的实现类具有和synchronized内置锁一样的内存同步语义。
文字表达些许干燥,上示例代码
Volatile Happens-before 示例
import java.util.concurrent.TimeUnit;
public class HappensBefore {
public volatile int a = 1;
public boolean run = false;
public static void main(String[] args) {
HappensBefore happensBefore = new HappensBefore();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
happensBefore.run = true;
happensBefore.a = 2;
}).start();
new Thread(() -> {
while (true) {
while (happensBefore.run) {
System.out.println("check running");
}
}
}).start();
new Thread(() -> {
while (true) {
while (happensBefore.a == 2) {
if (happensBefore.run) {
System.out.println("is running");
return;
}
}
}
}).start();
}
}
本段代码会输出 is running
但程序不会停止运行,因为 run
变量无法保证在线程2的可见性。
但是线程3正常会正常输出,验证了 Happens-before
原则。
反向验证
如果对一个变量的读操作先行发生于后面对这个变量的读操作,是否成立 Happens-before 呢
public class HappensBefore {
public volatile int a = 1;
public boolean run = false;
public static void main(String[] args) {
HappensBefore happensBefore = new HappensBefore();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
happensBefore.run = true;
System.out.println(happensBefore.a);
// happensBefore.a = 2;
}).start();
new Thread(() -> {
while (true) {
while (happensBefore.run) {
System.out.println("check running");
}
}
}).start();
new Thread(() -> {
while (true) {
while (happensBefore.a == 2) {
if (happensBefore.run) {
System.out.println("is running");
return;
}
}
}
}).start();
}
}
程序仅输出了 1 并没有输出 check running
说明遵守:
volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;
什么是 Happens-before
- 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;
- 锁定规则:一个unLock操作先行发生于后面对同一个锁额lock操作;
- volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;
- 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
- 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作;
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
- 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、
- Thread.isAlive()的返回值手段检测到线程已经终止执行;
- 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始;
利用 ReentranLock 保证可见性生产示例
package com.baidu.brpc.utils;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Since {@linkplain java.util.concurrent.ExecutorService ExecutorService} is a
* bit slow, we have to invent some wheels here.
* <p>
* NOT implement the {@linkplain java.util.concurrent.Executor Executor}
* interface unless we see the necessity
* @author Zhangyi Chen (chenzhangyi01@baidu.com)
* @update Wenwei hu (huwenwei@baidu.com)
*/
public final class ThreadPool {
private static final int DEFAULT_QUEUE_SIZE = 1024;
// Stand alone lock
private BoundedQueue<Runnable> produced;
private BoundedQueue<Runnable> toConsume;
private Lock consumerLock;
private Lock producerLock;
private Condition isProducerNotFullCondition;
private Condition isProducerNotEmptyCondition;
private ArrayList<Thread> threads;
private volatile boolean stopped;
public ThreadPool(int initialThreadNum, ThreadFactory threadFactory) {
this(initialThreadNum, threadFactory, 0);
}
public ThreadPool(int initialThreadNum, ThreadFactory threadFactory, int queueSize) {
if (initialThreadNum <= 0) {
throw new IllegalArgumentException(
"initialThreadNum=" + initialThreadNum
+ " should be positive" );
}
threads = new ArrayList<Thread>(initialThreadNum);
stopped = false;
if (queueSize <= 0) {
queueSize = DEFAULT_QUEUE_SIZE;
}
produced = new BoundedQueue<Runnable>(queueSize);
toConsume = new BoundedQueue<Runnable>(queueSize);
consumerLock = new ReentrantLock();
producerLock = new ReentrantLock();
isProducerNotEmptyCondition = producerLock.newCondition();
isProducerNotFullCondition = producerLock.newCondition();
// Start working threads at last, don't put any code after, or there
// will be race condition
for (int i = 0; i < initialThreadNum; ++i) {
Thread tr = threadFactory.newThread(new Runnable() {
@Override
public void run() {
consume();
}
});
tr.start();
threads.add(tr);
}
}
private void consume() {
while (true) {
Runnable task = null;
while (true) {
consumerLock.lock();
try {
if (!toConsume.isEmpty()) {
task = toConsume.pop();
break;
}
} finally {
consumerLock.unlock();
}
producerLock.lock();
try {
while (!stopped && produced.isEmpty()) {
try {
isProducerNotEmptyCondition.await();
} catch (InterruptedException ex) {
// ignore
}
}
if (!produced.isEmpty()) {
if (produced.isFull()) {
isProducerNotFullCondition.signalAll();
}
consumerLock.lock();
try {
BoundedQueue<Runnable> tmp = produced;
produced = toConsume;
toConsume = tmp;
} finally {
consumerLock.unlock();
}
} else {
// stopped must be true
break;
}
} finally {
producerLock.unlock();
}
}
if (task != null) {
task.run();
} else {
// The thread pool was shut down
break;
}
}
}
public void stop() {
stopped = true;
producerLock.lock();
try {
isProducerNotEmptyCondition.signalAll();
isProducerNotFullCondition.signalAll();
} finally {
producerLock.unlock();
}
}
public void join() {
synchronized (threads) {
for (Thread tr : threads) {
try {
tr.join();
} catch (InterruptedException e) {
// pass
}
}
threads.clear();
}
}
public boolean submit(Runnable task) {
Runnable[] tasks = { task };
return submit(tasks, 0, 1) == 1;
}
public long submit(Runnable []tasks, int offset, int len) {
int cur = offset;
int end = offset + len;
while (!stopped && cur < end) {
producerLock.lock();
try {
while (produced.isFull()) {
try {
isProducerNotFullCondition.await();
} catch (InterruptedException ex) {
// ignore
}
}
int toProduce = Math.min(produced.remainingCapacity(),
end - cur);
if (toProduce > 0) {
boolean wasEmpty = produced.isEmpty();
produced.addAll(tasks, cur, toProduce);
if (wasEmpty) {
isProducerNotEmptyCondition.signalAll();
}
}
cur += toProduce;
} finally {
producerLock.unlock();
}
}
return cur - offset;
}
public StatInfo getStatInfo() {
StatInfo statInfo = new StatInfo();
statInfo.setThreadNum(threads.size());
statInfo.setDefaultQueueCapacity(DEFAULT_QUEUE_SIZE);
statInfo.setProducerQueueSize(produced.size());
statInfo.setConsumerQueueSize(toConsume.size());
return statInfo;
}
@Setter
@Getter
public static class StatInfo {
private int threadNum;
private int defaultQueueCapacity;
private int producerQueueSize;
private int consumerQueueSize;
}
public boolean isStopped() {
return stopped;
}
}
相关参考
http://ifeve.com/java%E9%94%81%E6%98%AF%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%95%B0%E6%8D%AE%E5%8F%AF%E8%A7%81%E6%80%A7%E7%9A%84/
https://www.cnblogs.com/chenssy/p/6393321.html
https://github.com/baidu/brpc-java/blob/master/brpc-java-communication/src/main/java/com/baidu/brpc/utils/ThreadPool.java
网友评论