今天给大家介绍一个并发包中的线程工具Exchanger,他的主要作用是用来进行线程之间的数据交换的,一起来看看吧。
Exchanger
背书中:Exchanger是一个用来进行线程之间的数据交换的工具类,它提供了一个同步点,在这个同步点,两个线程可以互相交换数据,当一个线程执行exchange()方法时,会等待第二个线程执行exchange()方法,这个时刻就应该是书中所说的同步点,这两个线程就可以互相交换数据。
举个栗子
就拿今天晚上吃饭做例子吧,a去买鸡翅包饭,b去买了汉堡,然后互相替换了一个,上代码吧
package thread.exchanger;
import java.util.concurrent.Exchanger;
/**
* @author ZhaoWeinan
* @date 2018/10/16
* @description
*/
public class ExchangerDemo {
private static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args){
new Thread(new Runnable() {
@Override
public void run() {
String a = "我买了鸡翅包饭!";
System.out.println(Thread.currentThread().getName() + "说:" + a);
try {
System.out.println(Thread.currentThread().getName() + "说:等着b买汉堡!");
exchanger.exchange(a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
String b = "我买了汉堡!";
System.out.println(Thread.currentThread().getName() + "说:" + b);
try {
System.out.println(Thread.currentThread().getName() + "说:现在我们来交换吧!");
String a = exchanger.exchange(b);
//把第一个线程中 a 变量拿了过来
System.out.println(Thread.currentThread().getName() + "说我拿到了鸡翅包饭!");
System.out.println(Thread.currentThread().getName() + "说:" + a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
这段代码很好的描述了这个场景,创建了两个线程,在线程一中定义了一个变量a,a的内容是“我买了鸡翅包饭!”,在线程二中定义了一个变量b,b的内容是“我买了汉堡!”,然后线程一调用了exchanger.exchange(a)方法,进行了阻塞,也就是书中所说的等待同步点,在线程二中,调用exchanger.exchange(b)方法,也就是说,这两个线程都到达了同步点,所以线程一获取到了线程二中的变量“我买了汉堡!”,线程二获取到了线程一中的变量“我买了鸡翅包饭!”,代码运行结果如下:
运行结果
栗子就先说到这,来看一下源码吧
看下源码
Exchanger类的结构简单说一下,他主要有两个个内部类Node,Participant
Node类
@sun.misc.Contended static final class Node {
// Arena的索引
int index;
//最后记录的Exchanger的bound属性值
//Exchanger类中有个bound属性,用volatile修饰的
int bound;
//当前bound中原子操作的失败次数
//可以推断 Exchanger是用volatile、原子操作来保证线程安全的
int collides;
//用于自旋的伪随机数
int hash;
//这个线程的数据对象吧,是用来交换的对象
Object item;
//用来释放线程的对象
volatile Object match;
//当挂起,设置为次线程,否则为空
volatile Thread parked;
}
这个类看来主要是用来进行数据交换的类,Exchanger类中,有一个Node类型的slot属性,一个Node[]类型的arena属性,这两个就是所谓的单槽、多槽的两种模式,是在这两模式中用来交换的槽位,来看看Participant类
Participant类
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
Participant类继承了ThreadLocal,主要作用就是初始化一个node对象
再来看看核心的交换方法
slotExchange单槽交换
为了方便解说,整段代码就不贴了,一段一段的说吧
Node p = participant.get();
Thread t = Thread.currentThread();
创建一个Node对象p,获取了当前线程t
for (Node q;;) {
//先判断slot属性,
//不为空,证明有线程在等待着数据交换
if ((q = slot) != null) {
.........
}
}
先判断slot属性,不为空,证明有线程在等待着数据交换
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
说说这段代码,compareAndSwapObject方法是sun.misc.Unsafe类的一个方法
/* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
* 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
* @param var1 包含要修改field的对象
* @param var2 object中field的偏移量
* @param var4 希望field中存在的值
* @param var5 如果期望值var4与var1的field当前值相同,设置var1的field值为这个新值
* @return true 如果field的值被更改,则返回true
*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
有机会给大家细讲,在这主要作用是,用于比较this对象,this对象就是Exchanger的实例,this对象的SLOT偏移量上的属性与是否与q相等,如果相等则把this对象的这个属性更新为null,返回true,反之不更新,返回false,看一眼SLOT怎么定义的
private static final long SLOT;
....
SLOT = U.objectFieldOffset
(ek.getDeclaredField("slot"));
这个SLOT对应的是“slot”字段,也就是上面那个方法比较的是Exchanger实例中的slot属性
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
如果compareAndSwapObject执行成功,就把该线程挂起,然后唤起等待的线程,返回交换的结果,如果没有执行成功,那么我们来看看下面else块中的代码:
for (Node q;;) {
//先判断slot属性,
//不为空,证明有线程在等待着数据交换
if ((q = slot) != null) {
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// 存在线程间cpu的竞争,构建一个多槽位arena为解决问题
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
//如果构建多槽位,返回,使用多槽位模式解决问题
return null;
else {
//继续使用compareAndSwapObject就行比较交换
//这边入参变了,大家注意一下
//总结下这段代码的意思就是用当前线程来占领槽位
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
如果存在线程间cpu的竞争,构建一个多槽位arena为解决问题,否则,判断当前操作是否为空,如果为空就跳出循环,否则,无线循环来走上面的流程
//当前线程占领了槽位,等待其它线程来交换数据
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
//循环,直到match为空
while ((v = p.match) == null) {
if (spins > 0) {
//自旋
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
//线程自旋等待中,为了避免别的线程都在等待,先让出cpu执行权
Thread.yield();
}
//其它线程来交换数据了,修改槽位
else if (slot != p)
spins = SPINS;
//线程没有发生中断
//还是单槽模式
//而且还没有超时
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
//设置BLOCKER
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
//阻塞
U.park(false, ns);
//清空 BLOCKER
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
//使用compareAndSwapObject进行比较交换,如果成功跳出循环
//如果是超时了,或者线程被中断,则返回null
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
//交换完毕,返回v
//成功的话就是要交换的数据,如果超时、线程中断等情况返回null
return v;
这段代码的主要意思是,当前线程占领了槽位,等待其它线程来交换数据,自旋进行等待,如果有别的线程来交换数据了,那么使用compareAndSwapObject进行比较交换,交换到数据,唤起被挂起的线程,返回交换结果,如果是超时或者线程被中断时,返回null。
这就是单槽位模式,多槽位,没有怎么研究过,相信原理差不多,文章有点冗长,就不说多槽位模式了。
应用场景
说说应用场景吧,书上说可以用用于同步任务队列,遗传算法(表示没有听过),数据校对(这个还感觉比较靠谱),说说我第一次见到他吧,是我继承别人的代码的时候,一个定时任务用到的,有很多张mysql表,需要计算汇总结果到一张表里面,使用了Exchanger,但是感觉实现方式很多,这一种并不是很惊艳,不知道大家有没有好的使用场景,可以说一下。
Exchanger就为大家简单的说到这,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!
网友评论