美文网首页
并发容器BlockingQueue - LinkedTransf

并发容器BlockingQueue - LinkedTransf

作者: 王侦 | 来源:发表于2019-07-13 14:12 被阅读0次

    1.测试代码

        public static void main(String[] args) {
            final LinkedTransferQueue<Long> queue = new LinkedTransferQueue<Long>();
            Runnable offerTask = new Runnable(){
                @Override
                public void run(){
                    queue.offer(8L);
                    System.out.println("offerTask thread has gone!");
                }
            };
            Runnable takeTask = new Runnable(){
                @Override
                public void run(){
                    try {
                        System.out.println(Thread.currentThread().getId() + " " +queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            Runnable takeTaskInterrupted = new Runnable(){
                @Override
                public void run(){
                    Thread.currentThread().interrupt();
                    try {
                        System.out.println(Thread.currentThread().getId() + " " +queue.take());
                    } catch (InterruptedException e) {
                        System.out.println(e + " "+Thread.currentThread().getId());
                    }
                }
            };
    
    
            new Thread(takeTask).start();
            new Thread(takeTaskInterrupted).start();
            new Thread(offerTask).start();
            new Thread(takeTask).start();
        }
    

    调试步骤:

    • step1.四个线程同时运行到xfer()方法的Node pred = tryAppend(s, haveData);处


    • step2.Thread-0 take()运行,Thread-1 takeTaskInterrupted运行,Thread-2 offer()运行(Intellij IDEA debug F9),结果如下:
    java.lang.InterruptedException 14
    offerTask thread has gone!
    

    • step3.Thread-3 take()运行
      找到队列的last结点,发现与last结点模式不一致且last结点未匹配,则会重试。
    Node pred = tryAppend(s, haveData);
    if (pred == null)
         continue retry;    
    
     else if (p.cannotPrecede(haveData))
                    return null;  
    
            /**
             * Returns true if a node with the given mode cannot be
             * appended to this node because this node is unmatched and
             * has opposite data mode.
             */
            final boolean cannotPrecede(boolean haveData) {
                boolean d = isData;
                Object x;
                return d != haveData && (x = item) != this && (x != null) == d;
            }
    

    重试的情况,发现与头结点的模式一致,则尝试添加到链表尾,又会重现上一幕,结果就是死循环。

            retry:
            for (;;) {                            // restart on append race
    
                for (Node h = head, p = h; p != null;) { // find & match first node
                    boolean isData = p.isData;
                    Object item = p.item;
                    if (item != p && (item != null) == isData) { // unmatched
                        if (isData == haveData)   // can't match
                            break;
    

    2.为什么put会入队?

    这里有一个前提,所有线程都停留在tryAppend(s, haveData);处,所以当Thread-0 take()和Thread-1 takeTaskInterrupted运行完毕后,Thread-2 offer()是从tryAppend开始运行的,跳过了xfer()方法前面对于队头的匹配操作。

    但是offer和take模式不一致,为什么会入队?

            final boolean cannotPrecede(boolean haveData) {
                boolean d = isData;
                Object x;
                return d != haveData && (x = item) != this && (x != null) == d;
            }
    

    还是p.cannotPrecede(haveData)这个地方检测的问题,其中的(x = item) != this,由于上一个take()被中断了,该结点是处于CANCEL状态,其item指向自己。 导致这里即使模式不一致,也会入队,出现错误。

    那么本质原因就是:这里只有cannotPrecede()判别是不正确的,需要再加一个判断,即遇到CANCEL结点,应该retry。

    相关文章

      网友评论

          本文标题:并发容器BlockingQueue - LinkedTransf

          本文链接:https://www.haomeiwen.com/subject/ybgpkctx.html