美文网首页IT@程序员猿媛Java 杂谈程序员
阻塞队列LinkedTransferQueue的初窥

阻塞队列LinkedTransferQueue的初窥

作者: WANGGGGG | 来源:发表于2019-04-27 17:29 被阅读9次

    前言

    当我们身在分布式开发中时经常会碰到突然大量的消息造访,而我们的消费者无法及时处理,最终导致消息丢失,甚至服务崩溃。这个时候我们就需要暂时将这些不速之客请到“休息室”去坐一下。
    阻塞队列BlockingQueue就是我们经常使用的“休息室”。阻塞队列可以有效的阻止大量的消息冲击我们的服务,设置队列大小可以将无法处理的消息阻止在外。本文将学习jdk1.7中加入的阻塞队列--LinkedTransferQueue

    一、LinkedTransferQueue简介

    1.1 命名

    线性传输队列 网上查不到它的合适的翻译,我就给它取了一个名字--线性传输队列。线性传输队列的类继承结构如图所示

    image.png

    1.2 原理简介

    从图中我们可以看到它底层是Collection和Queue,因此它是具有集合特性的,同时还具有Queue的基本功能。名字中还带了Linked,说明他是链表形式的,然后它还引入了一个新的TransferQueue接口特性,有如下接口,接口的功能在下面讲述
    (1)boolean tryTransfer(E e);
    (2)void transfer(E e) throws InterruptedException;
    (3)boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    (4)boolean hasWaitingConsumer();
    (5)int getWaitingConsumerCount();

    1.3 LinkedTransferQueue核心方法

    以我目前的能力只能分析到这里了,光分析这个就有点勉强了,awaitMatch就不分析了

        /**
         * tryTransfer相关的方法底层实现都是通过xfer实现的
         * 根据它的表现,这是一个拉和吃一体的方法,非常全能
         *
         * @param e        需要插入的信息
         * @param haveData true表示数据插入,false表示数据请求
         * @param how      操作类型,有四种类型:NOW(即时), ASYNC(异步), SYNC(同步), or TIMED(超时模式)
         * @param nanos    超时时间,单位纳秒
         */
        private E xfer(E e, boolean haveData, int how, long nanos) {
            // 如果选择了有数据要插入,但是数据又是空的,就直接抛出异常
            if (haveData && (e == null))
                throw new NullPointerException();
            // 非常复杂的一个逻辑,又没有注释,所以这里的注释都是我以为,如果有问题请不吝赐喷
            // 最外圈的for循环是没有限制条件的,通过循环里面的continue restart跳转
            restart: for (Node s = null, t = null, h = null; ; ) {
                // 这个for节点的条件有点小复杂,但是没关系,一步步来
                // 这里利用了java从左往右的特性
                // t != (t = tail)这句话是,先拿到t的值,此时t=null,然后再将t赋值为tail
                // 最后做比较,null != tail,当然条件成立了,此时t已经是tail了,初始化时的tail.isData是true
                // 最后结论当haveData时(存值) p=tail,当不是haveData时(取值)p=head,符合队列特性
                for (Node p = (t != (t = tail) && t.isData == haveData) ? t
                        : (h = head); ; ) {
                    final Node q;
                    final Object item;
                    // 如果haveData为true(存数据)
                    // p不是数据节点且item是null,条件成立,反之不成立
                    // 如果haveData为false(取数据)
                    // p是数据节点且item不是null,条件成立,反之不成立
                    // 从上述说明看出:
                    // 存数据时,p=tail不是数据节点进入;
                    // 取数据时,p=head是数据节点进入
                    // 但isData和item是一致的,所以取数据才进入这个条件,除非数据被取走了
                    // 下个请求进来就直接从新的head取数据了
                    if (p.isData != haveData
                            && haveData == ((item = p.item) == null)) {
                        if (h == null) h = head;
                        // 尝试匹配,匹配成功更新p节点
                        if (p.tryMatch(item, e)) {
                            // 取数据时,一般情况下(h=head)!=(p=head)是false,
                            // 可能别的线程在这个时候有操作,h和p之间已经有被取走的节点了
                            // 可能Node还在,但是里面的item已经是空的了,所以要将p矫正成新的头节点
                            if (h != p) skipDeadNodesNearHead(h, p);
                            return (E) item;
                        }
                    }
    
                    // 存数据时,p是尾节点,尾节点后没有数据了,说明这就是个尾节点,能进入条件
                    // 取数据时,p是头节点,头结点后面如果是空的,也能进入条件
                    if ((q = p.next) == null) {
                        // 如果操作方式是NOW,那么直接返回输入的数据
                        if (how == NOW) return e;
                        // 给s创建一个空节点,e是空的就创建请求节点,否则创建数据节点
                        if (s == null) s = new Node(e);
                        // 更新next节点成功的话,到下个循环
                        if (!p.casNext(null, s)) continue;
                        // p不是尾节点,把s更新成最新的尾节点
                        if (p != t) casTail(t, s);
                        // 这个时候ASYNC就也需要返回了
                        if (how == ASYNC) return e;
                        // 旋转等待数据
                        return awaitMatch(s, p, e, (how == TIMED), nanos);
                    }
                    // 退到最外面的for循环重新开始
                    if (p == (p = q)) continue restart;
                }
            }
        }
    

    判断是否有已经等待的消费者,通过是否有空节点来判断

       public boolean hasWaitingConsumer() {
            restartFromHead: for (;;) {
                for (Node p = head; p != null;) {
                    Object item = p.item;
                    if (p.isData) {
                        // 如果队列里面还是有数据的
                        // 直接break
                        if (item != null)
                            break;
                    }
                    // 如果数据已经被人取走了,只剩下空节点了,
                    // 这个时候当前线程就是等待者,然后就返回true
                    else if (item == null)
                        return true;
                    if (p == (p = p.next))
                        continue restartFromHead;
                }
                return false;
            }
        }
    

    通过当前空节点数目来判断等待的消费者数目

     /**
         * 当前等待的消费者数目
         * @return
         */
        public int getWaitingConsumerCount() {
            return countOfMode(false);
        }
        private int countOfMode(boolean data) {
            restartFromHead: for (;;) {
                int count = 0;
                for (Node p = head; p != null;) {
                    // p节点是否被匹配了,当数据节点里面的item是空的,匹配成功
                    if (!p.isMatched()) {
                        // 因为p.isData是true,所以是数据请求时,
                        if (p.isData != data)
                            return 0;
                        // 等待累加,其实就是在循环内查看被其他线程取空的数据节点有多少个
                        if (++count == Integer.MAX_VALUE)
                            break;  // @see Collection.size()
                    }
                    if (p == (p = p.next))
                        continue restartFromHead;
                }
                return count;
            }
        }
    

    二、重要方法功能

    通过上面的分析,我大致了解了LinkedTransferQueue类中几个方法的功能

    2.1 put 方法

    顾名思义,这是一个存数据的方法

    public void put(E e) {
            xfer(e, true, ASYNC, 0);
        }
    

    异步存放插入数据,在tail后面插入新的节点,因为整个数据结构是链表,所以是无界的,所以不会阻塞

    2.2 offer 方法

    offer有两种方式,一种带超时的,一直不带超时的(表面上的)

     public boolean offer(E e, long timeout, TimeUnit unit) {
            xfer(e, true, ASYNC, 0);
            return true;
        }
     public boolean offer(E e) {
            xfer(e, true, ASYNC, 0);
            return true;
        }
    从源码中很容易判断出,这是个障眼法,因为本身就是不会阻塞的,所以超时时间就是个摆设,设置了没用
    

    2.3 add 方法

    和上面的offer效果是一模一样的

     public boolean add(E e) {
            xfer(e, true, ASYNC, 0);
            return true;
        }
    

    2.4 tryTransfer 方法

    有两种,一种直接返回,一种超时返回
    从上面的源码看NOW操作方式都没有创建新的节点,也就是不会把数据放到队列中,直接给等待中的消费者,如果没有等待中的,直接返回false,并且不会入队

    public boolean tryTransfer(E e) {
            return xfer(e, true, NOW, 0) == null;
        }
     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
                return true;
            if (!Thread.interrupted())
                return false;
            throw new InterruptedException();
        }
    

    2.5 transfer方法

    同步插入数据,如果没有取数据的消费者,一直等待。中间不支持中断线程,否则抛出异常

      public void transfer(E e) throws InterruptedException {
            if (xfer(e, true, SYNC, 0) != null) {
                Thread.interrupted(); // failure possible only due to interrupt
                throw new InterruptedException();
            }
        }
    

    2.6 take 方法

    操作方式是SYNC,一直等待,直到取到数据

     public E take() throws InterruptedException {
            E e = xfer(null, false, SYNC, 0);
            if (e != null)
                return e;
            Thread.interrupted();
            throw new InterruptedException();
        }
    

    2.7 poll 方法

    有两种方式,一种直接返回,一种带超时时间的
    直接返回的话可能就是空的数据,超时会阻塞线程,直到获取到数据或者超时

    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E e = xfer(null, false, TIMED, unit.toNanos(timeout));
            if (e != null || !Thread.interrupted())
                return e;
            throw new InterruptedException();
        }
    
        public E poll() {
            return xfer(null, false, NOW, 0);
        }
    

    2.8 getWaitingConsumerCount和hasWaitingConsumer

    这两个方法在上面已经做过源码分析了,一个是获取当前等待的线程数,一个是判断当前有没有在等待的

    结语

    xfer()这个方法里面还有很多逻辑没有弄懂,等下次完全读懂了后再更新一下

    相关文章

      网友评论

        本文标题:阻塞队列LinkedTransferQueue的初窥

        本文链接:https://www.haomeiwen.com/subject/oortnqtx.html