Exchanger用于线程之间交换数据。
// 建一个多线程共用的exchange对象 /
/ 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自 己的数据作为参数
// 传递进去,返回值是另外一个线程调用exchange传进去的参数
Exchanger<String> exchanger = new Exchanger<>();
new Thread("线程1") {
@Override public void run() {
while (true) {
try {
// 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调用exchange为止。
String otherData = exchanger.exchange("交换数据1");
System.out.println(Thread.currentThread().getName() + "=" +otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
} } } }.start();
new Thread("线程2") {
@Override public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据2");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}}}
new Thread("线程3") {
......
}
Exchanger的核心机制和Lock一样,也是CAS+park/unpark。
在Exchanger内部,有两个内部类:Participant和Node,代码如下:
public class Exchanger<V> {
// 添加了Contended注解,表示伪共享与缓存行填充
@jdk.internal.vm.annotation.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // 本次绑定中,CAS操作失败次数
int hash; // 自旋伪随机
Object item; // 本线程要交换的数据
volatile Object match; // 对方线程交换来的数据
// 当前线程
volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。
}
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() {
return new Node();
} }
// ...
}
每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。
这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组
private volatile Node[] arena;
网友评论