线程间交换数据的Exchanger

作者: 小草莓子桑 | 来源:发表于2018-10-25 23:53 被阅读80次

    今天给大家介绍一个并发包中的线程工具Exchanger,他的主要作用是用来进行线程之间的数据交换的,一起来看看吧。

    Exchanger

    背书中:Exchanger是一个用来进行线程之间的数据交换的工具类,它提供了一个同步点,在这个同步点,两个线程可以互相交换数据,当一个线程执行exchange()方法时,会等待第二个线程执行exchange()方法,这个时刻就应该是书中所说的同步点,这两个线程就可以互相交换数据。

    举个栗子

    就拿今天晚上吃饭做例子吧,a去买鸡翅包饭,b去买了汉堡,然后互相替换了一个,上代码吧

    package thread.exchanger;
    
    import java.util.concurrent.Exchanger;
    
    /**
     * @author ZhaoWeinan
     * @date 2018/10/16
     * @description
     */
    public class ExchangerDemo {
    
        private static Exchanger<String> exchanger = new Exchanger<>();
    
        public static void main(String[] args){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    String a = "我买了鸡翅包饭!";
                    System.out.println(Thread.currentThread().getName() + "说:" + a);
                    try {
                        System.out.println(Thread.currentThread().getName() + "说:等着b买汉堡!");
                        exchanger.exchange(a);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    String b = "我买了汉堡!";
                    System.out.println(Thread.currentThread().getName() + "说:" + b);
    
                    try {
                        System.out.println(Thread.currentThread().getName() + "说:现在我们来交换吧!");
                        String a = exchanger.exchange(b);
                        //把第一个线程中 a 变量拿了过来
                        System.out.println(Thread.currentThread().getName() + "说我拿到了鸡翅包饭!");
                        System.out.println(Thread.currentThread().getName() + "说:" + a);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    

    这段代码很好的描述了这个场景,创建了两个线程,在线程一中定义了一个变量a,a的内容是“我买了鸡翅包饭!”,在线程二中定义了一个变量b,b的内容是“我买了汉堡!”,然后线程一调用了exchanger.exchange(a)方法,进行了阻塞,也就是书中所说的等待同步点,在线程二中,调用exchanger.exchange(b)方法,也就是说,这两个线程都到达了同步点,所以线程一获取到了线程二中的变量“我买了汉堡!”,线程二获取到了线程一中的变量“我买了鸡翅包饭!”,代码运行结果如下:


    运行结果

    栗子就先说到这,来看一下源码吧

    看下源码

    Exchanger类的结构

    简单说一下,他主要有两个个内部类Node,Participant

    Node类
    @sun.misc.Contended static final class Node {
        // Arena的索引
        int index;
        //最后记录的Exchanger的bound属性值
        //Exchanger类中有个bound属性,用volatile修饰的
        int bound;
        //当前bound中原子操作的失败次数
        //可以推断 Exchanger是用volatile、原子操作来保证线程安全的
        int collides;
        //用于自旋的伪随机数
        int hash;
        //这个线程的数据对象吧,是用来交换的对象
        Object item;
        //用来释放线程的对象
        volatile Object match;
        //当挂起,设置为次线程,否则为空
        volatile Thread parked;
    }
    

    这个类看来主要是用来进行数据交换的类,Exchanger类中,有一个Node类型的slot属性,一个Node[]类型的arena属性,这两个就是所谓的单槽、多槽的两种模式,是在这两模式中用来交换的槽位,来看看Participant类

    Participant类
     static final class Participant extends ThreadLocal<Node> {
            public Node initialValue() { return new Node(); }
        }
    

    Participant类继承了ThreadLocal,主要作用就是初始化一个node对象
    再来看看核心的交换方法

    slotExchange单槽交换

    为了方便解说,整段代码就不贴了,一段一段的说吧

     Node p = participant.get();
     Thread t = Thread.currentThread();
    

    创建一个Node对象p,获取了当前线程t

    for (Node q;;) {
                //先判断slot属性,
                //不为空,证明有线程在等待着数据交换
                if ((q = slot) != null) {
              .........
                  }
            }
    

    先判断slot属性,不为空,证明有线程在等待着数据交换

    if (U.compareAndSwapObject(this, SLOT, q, null)) {
                        Object v = q.item;
                        q.match = item;
                        Thread w = q.parked;
                        if (w != null)
                            U.unpark(w);
                        return v;
                    }
    

    说说这段代码,compareAndSwapObject方法是sun.misc.Unsafe类的一个方法

    /* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
         * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
         * @param var1 包含要修改field的对象
         * @param var2 object中field的偏移量
         * @param var4 希望field中存在的值
         * @param var5 如果期望值var4与var1的field当前值相同,设置var1的field值为这个新值
         * @return true 如果field的值被更改,则返回true             
         */
        public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    

    有机会给大家细讲,在这主要作用是,用于比较this对象,this对象就是Exchanger的实例,this对象的SLOT偏移量上的属性与是否与q相等,如果相等则把this对象的这个属性更新为null,返回true,反之不更新,返回false,看一眼SLOT怎么定义的

    private static final long SLOT;
    ....
    SLOT = U.objectFieldOffset
                    (ek.getDeclaredField("slot"));
    

    这个SLOT对应的是“slot”字段,也就是上面那个方法比较的是Exchanger实例中的slot属性

    if (U.compareAndSwapObject(this, SLOT, q, null)) {
                        Object v = q.item;
                        q.match = item;
                        Thread w = q.parked;
                        if (w != null)
                            U.unpark(w);
                        return v;
                    }
    

    如果compareAndSwapObject执行成功,就把该线程挂起,然后唤起等待的线程,返回交换的结果,如果没有执行成功,那么我们来看看下面else块中的代码:

        for (Node q;;) {
                //先判断slot属性,
                //不为空,证明有线程在等待着数据交换
                if ((q = slot) != null) {
                    if (U.compareAndSwapObject(this, SLOT, q, null)) {
                        Object v = q.item;
                        q.match = item;
                        Thread w = q.parked;
                        if (w != null)
                            U.unpark(w);
                        return v;
                    }
                    // 存在线程间cpu的竞争,构建一个多槽位arena为解决问题
                    if (NCPU > 1 && bound == 0 &&
                            U.compareAndSwapInt(this, BOUND, 0, SEQ))
                        arena = new Node[(FULL + 2) << ASHIFT];
                }
                else if (arena != null)
                    //如果构建多槽位,返回,使用多槽位模式解决问题
                    return null;
                else {
                    //继续使用compareAndSwapObject就行比较交换
                   //这边入参变了,大家注意一下
                  //总结下这段代码的意思就是用当前线程来占领槽位
                    p.item = item;
                    if (U.compareAndSwapObject(this, SLOT, null, p))
                        break;
                    p.item = null;
                }
            }
    

    如果存在线程间cpu的竞争,构建一个多槽位arena为解决问题,否则,判断当前操作是否为空,如果为空就跳出循环,否则,无线循环来走上面的流程

           //当前线程占领了槽位,等待其它线程来交换数据
            int h = p.hash;
            long end = timed ? System.nanoTime() + ns : 0L;
            int spins = (NCPU > 1) ? SPINS : 1;
            Object v;
            //循环,直到match为空
            while ((v = p.match) == null) {
                if (spins > 0) {
                    //自旋
                    h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                    if (h == 0)
                        h = SPINS | (int)t.getId();
                    else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                          //线程自旋等待中,为了避免别的线程都在等待,先让出cpu执行权
                          Thread.yield();
                }
               //其它线程来交换数据了,修改槽位
                else if (slot != p)
                    spins = SPINS;
               //线程没有发生中断
               //还是单槽模式
               //而且还没有超时
                else if (!t.isInterrupted() && arena == null &&
                        (!timed || (ns = end - System.nanoTime()) > 0L)) {
                    //设置BLOCKER
                    U.putObject(t, BLOCKER, this);
                    p.parked = t;
                    if (slot == p)
                        //阻塞
                        U.park(false, ns);
                    //清空 BLOCKER
                    p.parked = null;
                    U.putObject(t, BLOCKER, null);
                }
                else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                    //使用compareAndSwapObject进行比较交换,如果成功跳出循环
                   //如果是超时了,或者线程被中断,则返回null
                    v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                    break;
                }
            }
            U.putOrderedObject(p, MATCH, null);
            p.item = null;
            p.hash = h;
            //交换完毕,返回v
            //成功的话就是要交换的数据,如果超时、线程中断等情况返回null
            return v;
    

    这段代码的主要意思是,当前线程占领了槽位,等待其它线程来交换数据,自旋进行等待,如果有别的线程来交换数据了,那么使用compareAndSwapObject进行比较交换,交换到数据,唤起被挂起的线程,返回交换结果,如果是超时或者线程被中断时,返回null。
    这就是单槽位模式,多槽位,没有怎么研究过,相信原理差不多,文章有点冗长,就不说多槽位模式了。

    应用场景

    说说应用场景吧,书上说可以用用于同步任务队列,遗传算法(表示没有听过),数据校对(这个还感觉比较靠谱),说说我第一次见到他吧,是我继承别人的代码的时候,一个定时任务用到的,有很多张mysql表,需要计算汇总结果到一张表里面,使用了Exchanger,但是感觉实现方式很多,这一种并不是很惊艳,不知道大家有没有好的使用场景,可以说一下。

    Exchanger就为大家简单的说到这,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

    相关文章

      网友评论

      本文标题:线程间交换数据的Exchanger

      本文链接:https://www.haomeiwen.com/subject/ygzqzftx.html