Exchanger

作者: Pillar_Zhong | 来源:发表于2019-06-12 00:14 被阅读0次
    Exchanger.png

    示例

    public class ExchangerTest {
    
        static class Producer implements Runnable{
    
            //生产者、消费者交换的数据结构
            private List<String> buffer;
    
            //步生产者和消费者的交换对象
            private Exchanger<List<String>> exchanger;
    
            Producer(List<String> buffer,Exchanger<List<String>> exchanger){
                this.buffer = buffer;
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() {
                for(int i = 1 ; i < 5 ; i++){
                    System.out.println("生产者第" + i + "次提供");
                    for(int j = 1 ; j <= 3 ; j++){
                        System.out.println("生产者装入" + i  + "--" + j);
                        buffer.add("buffer:" + i + "--" + j);
                    }
    
                    System.out.println("生产者装满,等待与消费者交换...");
                    try {
                        exchanger.exchange(buffer);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class Consumer implements Runnable {
            private List<String> buffer;
    
            private final Exchanger<List<String>> exchanger;
    
            public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
                this.buffer = buffer;
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() {
                for (int i = 1; i < 5; i++) {
                    //调用exchange()与消费者进行数据交换
                    try {
                        buffer = exchanger.exchange(buffer);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    System.out.println("消费者第" + i + "次提取");
                    for (int j = 1; j <= 3 ; j++) {
                        System.out.println("消费者 : " + buffer.get(0));
                        buffer.remove(0);
                    }
                }
            }
        }
    
        public static void main(String[] args){
            List<String> buffer1 = new ArrayList<String>();
            List<String> buffer2 = new ArrayList<String>();
    
            Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
    
            Thread producerThread = new Thread(new Producer(buffer1,exchanger));
            Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
    
            producerThread.start();
            consumerThread.start();
        }
    }
    

    Participant

    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
    

    Node

    // 避免伪共享
    @sun.misc.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // Number of CAS failures at current bound
        int hash;               // Pseudo-random for spins
        Object item;            // This thread's current item
        volatile Object match;  // Item provided by releasing thread
        volatile Thread parked; // Set to this thread when parked, else null
    }
    

    slotExchange

    private final Object slotExchange(Object item, boolean timed, long ns) {
        Node p = participant.get();
        Thread t = Thread.currentThread();
        // 中断处理
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;
        // 自旋
        for (Node q;;) {
            // 如果slot有值,说明对方的数据到了
            // 因为双方都会在slot交换数据,那么为什么slot不为空,说明是对方的数据到了呢?
            // 因为自己放完数据后会退出自旋,所以只有可能是对方
            if ((q = slot) != null) {
                // 先清理掉slot,如果可能的话
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    // 拿出对方发来的数据返回
                    Object v = q.item;
                    // 且将己方需要跟对方交换的数据放入match中,然后唤醒对方,让对方能使用
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // create arena on contention, but continue until slot null
                // 上面slot清理失败的话,说明有竞争,那么直接初始化arena
                if (NCPU > 1 && bound == 0 &&
                    // 这里初始化BOUND为SEQ,0xff+1
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            // 如果这时候arena不为空,说明是多槽的情况,返回null,以便外部调用arenaExchange
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                // 如果slot没有东西,那么将自己需要交换的数据先放到槽中
                p.item = item;
                // 如果成功,结束自旋,去下面等待对方来交换
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                // 否则,清理现场,继续自旋
                p.item = null;
            }
        }
    
        // await release
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        // 等待对方交换的数据
        while ((v = p.match) == null) {
            // 自旋并出让CPU
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            // 如果slot不等于p,说明slot被对方填充了,也就是对方交换的数据到了
            // 但是还没有设置到match中,己方还没真正拿到对方交换的数据
            // 到这里还说明,自旋的机会都用完了,既然已经临门一脚了,再重置下自旋次数,好让己方安心等待数据
            else if (slot != p)
                spins = SPINS;
            // 到这里,说明对方的数据没有来,且自旋也耗尽了
            // 如果没有指定超时,或指定了超时时间,但是没有到期的话
            // 那么可以考虑阻塞己方线程,等对方到了,再等他在方法开头的自旋中被对方唤醒
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                // 唤醒后处理
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            // 到这里,说明已经超时了,将slot清空,且结束交换
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 清理现场
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        // 将对方设置到match的value返回出去,也就是需要交换的数据
        return v;
    }
    

    arenaExchange

    private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena;
        Node p = participant.get();
        for (int i = p.index;;) {                      // access slot at i
            int b, m, c; long j;                       // j is raw array offset
            // 首先拿到本次双方交换在数组中的slot
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 如果槽位不为空,说明对方的数据已经到了,拿出对方发来的数据返回
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                // 唤醒对方,将数据交给match,让对方使用
                if (w != null)
                    U.unpark(w);
                return v;
            }
            // 如果槽位为空,且i并没有越界
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // 先设置item,准备插入槽位
                p.item = item;                         // offer
                // 如果填充槽位成功
                if (U.compareAndSwapObject(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;
                        // 如果等待对方的交换数据,那么返回,且清理现场,准备下次复用
                        if (v != null) {
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        }
                        // 如果还可以继续自旋,那么出让CPU
                        else if (spins > 0) {
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        // 如果对方的数据到了,但还没有match,且自旋也不够了,那么再坚持下。
                        // 重置自旋
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS;       // releaser hasn't set match yet
                        // 到这里,说明对方的数据没有来,且自旋也耗尽了
                        // 如果没有指定超时,或指定了超时时间,但是没有到期的话
                        // 那么可以考虑阻塞己方线程,等对方到了,再等他在方法开头的自旋中被对方唤醒
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        // 如果超时,那么清理现场
                        else if (U.getObjectVolatile(a, j) == p &&
                                 U.compareAndSwapObject(a, j, p, null)) {
                            // 如果slot置空成功,并且arene不为空  
                            if (m != 0)                // try to shrink
                                // 高8位是版本,左移一位,代表版本有变更
                                // 低8位是计数,减一
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            // 索引减半,下面会用到
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                }
                // 如果更新slot失败,那么清理现场
                else
                    p.item = null;                     // clear offer
            }
            // 如果到了这里还没有交换,上面的CAS设置槽位失败,说明竞争很激烈
            else {
                // 如果bound发生了变化,说明arene有变更
                if (p.bound != b) {                    // stale; reset
                    // 首先更新最新的bound
                    p.bound = b;
                    // collides重置
                    p.collides = 0;
                    // index减一,也就是往左边查找
                    // 这样的目的是换个slot,避免竞争
                    i = (i != m || m == 0) ? m : m - 1;
                }
                // 如果CAS失败,说明有竞争,如果m还没有到达FULL且失败次数还没满
                // collides < m, 因为m是槽位的size,如果小于m,说明还有空余的槽位可供使用,
                //  一直退到槽位用完为止
                // 那么既然,槽位全部失败了,且槽位还可以允许继续扩容,那么CAS扩容失败的话呢,
                //  collides加一,且槽位左移
                else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                }
                else
                    // 如果扩容成功,index直接拿到新的槽位
                    i = m + 1;                         // grow
                p.index = i;
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Exchanger

          本文链接:https://www.haomeiwen.com/subject/kruhfctx.html