美文网首页
大飞老师带你看线程(并发容器-SynchronousQueue)

大飞老师带你看线程(并发容器-SynchronousQueue)

作者: 叩丁狼教育 | 来源:发表于2019-04-19 11:51 被阅读0次

    本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。

    接上一篇, 本篇讲SynchronousQueue队列非公平策略put与take操作

    源码分析

    2:非公平锁策略- put / take

        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, false, 0) == null) {
                Thread.interrupted();
                throw new InterruptedException();
            }
        }
    
        public E take() throws InterruptedException {
            E e = transferer.transfer(null, false, 0);
            if (e != null)
                return e;
            Thread.interrupted();
            throw new InterruptedException();
        }
    

    put 跟 take 方法有都调用:
    调用transfer 方法:
    put : transferer.transfer(e, false, 0)
    take: transferer.transfer(null, false, 0);
    从前面内部结构看: SynchronousQueue 非公平策略底层实际上委托给TransferStack 栈实现, 而内部存储数据使用SNode 栈节点 来看下节点源码:

    static final class SNode {
        volatile SNode next;        //下一个节点    
        volatile SNode match;       //相匹配的节点
        volatile Thread waiter;     //线程挂起与唤醒控制: park /unpark      
        Object item;                //节点内容    
        //模式控制变量
        //非公平策略模式有3种:
        //REQUEST:表示消费数据-take操作
        //DATA:表示生产数据-put操作
        //FULFILLING:介于上2个状态间
            //多一个状态原因:TransferStack 配对操作原理是:每个线程配对时,都会先加入到栈顶中, 不管是take还是put,如果此时发现入栈的线程跟栈内线程可以配对,那么此时线程可以使用FULFILLING状态类标记:将要执行配对逻辑线程, 提示其他配对线程,配对时跳过这个节点,匹配其他的线程。
        int mode;
    
        //cas原子操作, 设置next节点
        boolean casNext(SNode cmp, SNode val) {...}
        //节点匹配,匹配成功,则unpark等待线程
        boolean tryMatch(SNode s) {...}
        //取消节点, 将节点内容设置为自己
        void tryCancel() {...}
        //判断是否操作结束
        boolean isCancelled() {...}
    }
    

    此处研究的是公平锁策略, 所以, 此时的transfer变量执项的是: TransferStack 类的实例

    E transfer(E e, boolean timed, long nanos) {
        SNode s = null; 
        //根据transfer方法参数e判断当前操作模式
        //有值DATA 反之为REQUEST 
        int mode = (e == null) ? REQUEST : DATA;
        for (;;) {
            SNode h = head;  //初始时head为null
            //第一次put或take h==null成立, 如果已经存在挂起线程,入栈, 不管take或put mode必须一致才可以进入,此分支是入栈分支,不会进行配对
            if (h == null || h.mode == mode) {  
                if (timed && nanos <= 0) {  //超时操作   
                    //队列中有配对线程,但是配对线程被取消了
                    if (h != null && h.isCancelled()) 
                        casHead(h, h.next);   //下移动栈顶节点,出栈
                    else
                        return null;
                 //满足并进入此分支, 创建新节点,当前线程入栈成为栈顶节点
                } else if (casHead(h, s = snode(s, e, h, mode))) {
                 //一旦栈顶位置移动成功, 挂起当前线程,等待配对线程
                    SNode m = awaitFulfill(s, timed, nanos);
                    if (m == s) {             
                        clean(s);  //一旦配对成功, 对应的线程出栈(被清除)
                        return null;
                    }
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);     
                    return (E) ((mode == REQUEST) ? m.item : s.item); //返回结果
                }
            //出现mode不同操作时,那么就需要配对了,此时需要满足单前栈顶节点不是正在配对节点, 所以做了排除其他也企图对栈顶节点配对的竞争线程操作
            } else if (!isFulfilling(h.mode)) { 
                if (h.isCancelled())  //检查,如果栈顶节点配其他线程配对成功,栈顶节点下移
                    casHead(h, h.next);   
                //如果没有竞争线程, 线程入栈成为栈顶节点, 同时加正在配对状态
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    for (;;) {
                        //获取目标配对的节点
                        SNode m = s.next;    
                        //如果目标配对节点为null,  则表示目标配对节点被其他线程抢走了,
                        if (m == null) {      
                            casHead(s, null);   //从新移动栈顶节点为null
                            s = null;     //配对失败, 跳出循环, 从新循环判断     
                            break;              
                        }
                        //如果不为null, 获取目标配对节点下一个节点
                        SNode mn = m.next;
                        if (m.tryMatch(s)) { //尝试配对
                            casHead(s, mn);  //配对成功,移动栈顶节点到mn, 同时配对的2个节点同时出栈, 返回结果  
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else                
                            //如果配对失败,寻找原目标节点下一个节点,重新循环配对
                            s.casNext(m, mn);  
                    }
                }
            } else {  //进入这个分支,表示目标配对的节点是一个正在配对的节点
                //直接跳过,配对下一个节点 , 接下逻辑跟上面配对一样                     
                SNode m = h.next;            
                if (m == null)               
                    casHead(h, null);         
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))         
                        casHead(h, mn);        
                    else                        
                        h.casNext(m, mn);       
                }
            }
        }
    }
    

    非公平策略操作流程图:


    添加元素
    获取元素

    这里需要说明, 如果元素过多,并发时,配对的线程具体要配对谁,那么就随机啦,无法按照图中理想的配对顺序。这也是为什么说是非公平策略啦。

    总结:
    结合代码: transferer执行流程可归纳为以下:
    1: transferer调用transfer方法实现SynchronousQueue 公平队列的take跟put操作
    2:不管是take或者put 线程只要跟栈顶节点模式一样,都要入栈,然后通过自旋方式寻找匹配,找不到配对挂起。如果时间消耗完毕,或者被取消,直接出列。
    3:如果找到配对的节点,将当前线程打上FULFILLING标记,尝试配对。如果是此时有竞争配对线程,检查到该节点有FULFILLING标记,直接跳过,寻找下一个配对节点。
    4:如果配对成功, 弹出当前配对成功2个节点。如果配对失败,从头循环。

    想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

    相关文章

      网友评论

          本文标题:大飞老师带你看线程(并发容器-SynchronousQueue)

          本文链接:https://www.haomeiwen.com/subject/lbkiwqtx.html