美文网首页
阻塞队列 — LinkedTransferQueue源码分析

阻塞队列 — LinkedTransferQueue源码分析

作者: 一角钱技术 | 来源:发表于2020-11-29 10:52 被阅读0次

点赞再看,养成习惯,公众号搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

前言

LinkedTransferQueue 是一个由链表结构组成的无界阻塞传输队列,它是一个很多队列的结合体(ConcurrentLinkedQueue,LinkedBlockingQueueSynchronousQueue),在除了有基本阻塞队列的功能(但是这个阻塞队列没有使用锁)之外;队列实现了TransferQueue接口重写了transfer 和 tryTransfer 方法,这组方法和SynchronousQueue公平模式的队列类似,具有匹配的功能。

LinkedTransferQueue是LinkedBlockingQueueSynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体,它综合了这三者的方法,并且提供了更加高效的实现方式。

队列创建

TransferQueue<String> queue = new LinkedTransferQueue<String>();

应用场景

LinkedTransferQueue采用的一种预占模式。意思就是消费者线程取元素时,如果队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程park住,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,唤醒该节点上park住线程,被唤醒的消费者线程拿货走人。这就是预占的意思:有就拿货走人,没有就占个位置等着,等到或超时。

我们来看一个例子:

package com.niuh.queue.transfer;


import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class TestLinkedTransferQueue {
    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        Thread producer = new Thread(new Producer(queue));
        producer.setDaemon(true); // 设置为守护进程使得线程执行结束后程序自动结束运行
        producer.start();
        for (int i = 0; i < 10; i++) {
            Thread consumer = new Thread(new Consumer(queue));
            consumer.setDaemon(true);
            consumer.start();
            try {
                // 消费者进程休眠一秒钟,以便以便生产者获得CPU,从而生产产品
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 模拟生产者
 */
class Producer implements Runnable {
    private final TransferQueue<String> queue;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    private String produce() {
        return " number " + (new Random().nextInt(100));
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (queue.hasWaitingConsumer()) {
                    queue.transfer(produce());
                }
                TimeUnit.SECONDS.sleep(1);// 生产者睡眠一秒钟,这样可以看出程序的执行过程
            }
        } catch (InterruptedException e) {
        }
    }
}

/**
 * 模拟消费者
 */
class Consumer implements Runnable {
    private final TransferQueue<String> queue;

    public Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(" Consumer " + Thread.currentThread().getName() + queue.take());
        } catch (InterruptedException e) {
        }
    }
}

工作原理

LinkedTransferQueue 使用了一个叫做 dual data structure 的数据结构,或者叫做 dual queue ,翻译为双重数据结构或者双重队列。

双重队列是指:放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。

  • 放元素时先跟队列头节点对比,如果头节点是非数据节点,就让它们匹配,如果头节点是数据节点,就生产一个数据节点放在队列尾端(入队)。
  • 取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让它们匹配,如果头节点是非数据节点,就生产一个非数据节点放在队列尾端(入队)。

用图来来表示如下:



不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。

源码分析

定义

LinkedTransferQueue的类继承关系如下:



LinkedTransferQueue实现了TransferQueue接口,而TransferQueue接口是继承自BlockingQueue的,所以LinkedTransferQueue也是一个阻塞队列。

其包含的方法定义如下:


TransferQueue接口

对比前面的阻塞队列,会发现LinkedTransferQueue 的继承体系有特殊之处。前面的阻塞队列都直接实现的BlockingQueue接口,在LinkedTransferQueue 却多了一个TransferQueue 接口,而该接口继承至BlockingQueue。


BlockingQueue 接口代表的是普通的阻塞队列,TransferQueue 则代表的是另一种特殊阻塞队列,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞

前面我们分析的SynchronousQueue 不就是有这种特性吗,但是SynchronousQueue 并没有实现TransferQueue 接口,原因就在于TransferQueue 接口也是在jdk 1.7才出现的,应该是为了和前面的阻塞队列进行区分,同时为了后面扩充这种特殊的阻塞队列,才加入了TransferQueue ,这样功能才不至于混乱(单一职能原则)。

public interface TransferQueue<E> extends BlockingQueue<E> {
    //立即转交一个元素给消费者,如果没有等待的消费者,则返回false(元素不入队)
    boolean tryTransfer(E e);

    //转交一个元素给消费者,如果没有等待的消费者,则阻塞直到消费者到来,或者发生异常
    void transfer(E e) throws InterruptedException;

    //转交一个元素给消费者,如果没有等待的消费者,则阻塞直到超时
    boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;

    //是否存在等待的消费者
    boolean hasWaitingConsumer();

    //返回等待的消费者的个数
    int getWaitingConsumerCount();
}

在SynchronousQueue 中也有类似的方法,当然没有这么多,只是以内部类的形式存在(TransferQueue、TransferStack),而在LinkedTransferQueue 则把这种阻塞操作抽成了接口。

成员属性

// 判断是否为多核
private static final boolean MP =Runtime.getRuntime().availableProcessors() > 1;
// 自旋次数
private static final int FRONT_SPINS   = 1 << 7;
// 前驱节点正在处理,当前节点需要自旋的次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
// 容忍清除节点失败次数的阈值
static final int SWEEP_THRESHOLD = 32;

/** 头节点 */
transient volatile Node head;
/** 尾节点 */
private transient volatile Node tail;

/*
*  放取元素的几种方式,调用xfer()方法时需要传入,区分不同处理,
* xfer()方法是LinkedTransferQueue的最核心的方法
*/
// 立即返回,用于非超时的 poll() 和 tryTransfer() 方法中
private static final int NOW   = 0; // for untimed poll, tryTransfer
// 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
private static final int ASYNC = 1; // for offer, put, add
// 同步,调用的时候如果没有匹配到会阻塞直到匹配为止
private static final int SYNC  = 2; // for transfer, take
// 超时,用于有超时的poll() 和 tryTransfer() 方法中
private static final int TIMED = 3; // for timed poll, tryTransfer

注意xfer 者几个参数很重要。

  • NOW :表示的是立即,不需要等待的意思,用于poll和tryTransfer方法,poll 队列为空返回,tryTransfer队列没有消费者,构造一个空的节点直接返回,都是不等待的。
  • ASYNC :异步,offer, put, add等入队方法,由于是无界队列,所以不会阻塞。
  • SYNC :同步表示会阻塞,take一个元素,没有就会阻塞,transfer传输,必须等待消费者来消费。
  • TIMED :带超时时间的now,会等待一定的时间后返回。

主要内部类

static final class Node {
    // 是否是数据节点(也就标识了是生产者还是消费者)
    final boolean isData;   // false if this is a request node
    // 元素的值
    volatile Object item;   // initially non-null if isData; CASed to match
    // 下一个节点
    volatile Node next;
    // 持有元素的线程
    volatile Thread waiter; // null until waiting
}

典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。内部通过 isData 区分是生产者还是消费者。

构造函数

public LinkedTransferQueue() {
}

public LinkedTransferQueue(Collection<? extends E> c) {
    this();
    addAll(c);
}

只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。

入队方法

LinkedTransferQueue提供了add、put、offer三类方法,用于将元素放到队列中。其中三类(4个)方法都是一样的,使用异步的方式调用 xfer() 方法,传入的参数都一模一样。

注意:我们这里所说的入队操作是指add,put,offer这几个方法,而不是指真正的把节点入队的操作,因为LinkedTransferQueue 中针对的不是数据,而是操作,操作可能需要入队,而这个操作可能是放数据操作,也可能是取数据操作,这里注意区分一下,不要搞混了。

public void put(E e) {
    // 异步模式,不会阻塞,不会超时
    // 因为是放元素,单链表存储,会一直往后加
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

LinkedTransferQueue 是一个由链表组成的无界队列,因此不会有容量限制(一定范围内),因此这里入队的操作都不会阻塞(因此超时入队方法实际也没有用),也就是说,入队后线程会立即返回,这个是参数ASYNC的作用。

xfer(E e, boolean haveData, int how, long nanos)的参数分别是:

  1. e表示元素;
  2. haveData表示是否是数据节点,
  3. how表示放取元素的方式,上面提到的四种,NOW、ASYNC、SYNC、TIMED;
  4. nanos表示超时时间;

xfer()方法

在看 xfer 方法之前,我们先来了解以下大致流程,来帮助我们理解。

LinkedTransferQueue 和 SynchronousQueue 是一样的,队列中主要的不是针对数据,而是操作(put或take,注意这里put、take指的是放入数据和取数据),队列中即可以存储入队操作,也可以存储出队操作,当队列为空时,如果有线程进行出队操作,那么这个时候队列是没有数据的,那么这个操作就会被入队,同时线程也会阻塞,直到数据的到来(或出现异常),如果最开始队列为空,放入数据的操作到来,那么数据就会被放到队列中,此后如果取数据操作到来,那么就会从队列中取出数据,因此可以知道队列中存放的都是一系列相同的操作(put-放数据操作 或 take-取数据操作)。

接下来我们先说 放数据操作,那么如果队列为空,那么直接将数据入队即可,同时因为是无界队列,线程不会阻塞,直接返回,如果队列不为空,那么队列里面可能有两种情况:

  1. 存放的都是数据。那么本次操作的和队列中的节点操作是一样的,因此直接把数据放到队列末尾,线程返回。
  2. 存放的都是取数据操作。那么本次操作和队列中的节点操作是不一样的(也就是匹配的,放入数据操作和取数据操作是匹配的,也就是不同的操作是匹配的,相同的操作是不匹配的),那么就把对头的节点出队,就把本次的数据给对头节点,同时唤醒该节点的线程。
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) {                            // restart on append race

        /**
         * 下面这个for循环用于控制匹配的过程
         * 同一时刻队列中只会存储一种类型的节点
         * 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了,
         * 就再尝试其下一个,直到匹配为止,或者到队列中没有元素为止
         */
        for (Node h = head, p = h; p != null;) { // find & match first node
            // p节点的模式
            boolean isData = p.isData;
            // p节点的值
            Object item = p.item;
            // p没有被匹配到
            if (item != p && (item != null) == isData) { // unmatched
                // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
                if (isData == haveData)   // can't match
                    break;
                // 如果两者模式不一样,则尝试匹配
                // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
                if (p.casItem(item, e)) { // match
                    // 匹配成功
                    // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                    // 看不懂可以直接跳过
                    for (Node q = p; q != h;) {
                        // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点 
                        Node n = q.next;  // update by 2 unless singleton
                        // 如果head还没变,就把它更新成新的节点
                        // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                        // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                        // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                        // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 唤醒p中等待的线程
                    LockSupport.unpark(p.waiter);
                    // 并返回匹配到的元素
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // p已经被匹配了或者尝试匹配的时候失败了
            // 也就是其它线程先一步匹配了p
            // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
            // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        // 到这里肯定是队列中存储的节点类型和自己一样 或者 队列中没有元素了,就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身

        // 如果不是立即返回
        if (how != NOW) {                 // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

对于这里的操作 (放入数据) ,寻找匹配节点:

  • 如果找到了,就设置item值,然后unpark匹配节点的waiter线程,返回(其实就是看看队列里面的操作是不是取数据操作),否则就入队(NOW直接返回);
  • 如果没有找到匹配节点,则根据传入的how来处理,NOW直接返回,其余三种先入队,入队后如果是ASYNC则返回,SYNC和TIMED则会阻塞等待匹配。

我们再看看里面的部分方法:

 /**
  * Returns true if this node has been matched, including the
  * case of artificial matches due to cancellation.
  */
 final boolean isMatched() {
     Object x = item;
     return (x == this) || ((x == null) == isData);
 }

如果操作节点已经被匹配了,那么item会被改变,对于取数据操作,那么item会被设置成数据,如果操作被取消了,那么会设置item为this。

/**
 * Links node to itself to avoid garbage retention.  Called
 * only after CASing head field, so uses relaxed write.
 */
final void forgetNext() {
    UNSAFE.putObject(this, nextOffset, this);
}

forgetNext 设置为 next 为自身,也就是脱离链表,同时方便gc回收自己。

接下来我们再看看入队调用的 tryAppend 方法:

private Node tryAppend(Node s, boolean haveData) {
    // 从tail开始遍历,把s放到链表尾端
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // 如果首尾都是null,说明链表中还没有元素
        if (p == null && (p = head) == null) {
            // 就让首节点指向s
            // 注意,这里插入第一个元素的时候tail指针并没有指向s
            if (casHead(null, s))
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            // 如果p无法处理,则返回null
            // 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
            // 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
            // 队列中所有的元素都要保证是同一种类型的节点
            // 返回null后外面的方法会重新尝试匹配重新入队等
            return null;                  // lost race vs opposite mode
        else if ((n = p.next) != null)    // not last; keep traversing
            // 如果p的next不为空,说明不是最后一个节点
            // 则让p重新指向最后一个节点
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))
            // 如果CAS更新s为p的next失败
            // 则说明有其它线程先一步更新到p的next了
            // 就让p指向p的next,重新尝试让s入队
            p = p.next;                   // re-read on CAS failure
        else {
            // 到这里说明s成功入队了
            // 如果p不等于t,就更新tail指针
            // 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
            // 这里就是用来更新tail指针的
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            // 返回p,即s的前一个元素
            return p;
        }
    }
}

这个操作入队(把节点链接接到链表末尾)看上去有点复杂,主要原因还是没有使用锁,存在很多并发情况下,有可能自己在添加节点入队的时候,其它线程已经把队列改变了,那么这个时候就需要重新找到队尾,进行提那件操作,添加成功后,也需要设置队尾指针,这个时候队尾指针可能也被其它线程设置了,那么这个时候自己也要保证队尾指针是正确的(遍历验证)。

在上面看到有个方法:p.cannotPrecede(haveData),如果数据不符合要求,那么是不会入队的。

/**
 * Returns true if a node with the given mode cannot be
 * appended to this node because this node is unmatched and
 * has opposite data mode.
 */
final boolean cannotPrecede(boolean haveData) {
    boolean d = isData;
    Object x;
    return d != haveData && (x = item) != this && (x != null) == d;
}

这个就是验证操作和其它数据节点的数据是否吻合的。

awaitMatch 这个我们在出队来分析,因为放数据的过程是不会阻塞的,当然也更不会执行该方法。

出队方法

LinkedTransferQueue提供了poll、take、remove方法用于出列元素,出队的三类(4个)方法也是直接或间接的调用xfer()方法,放取元素的方式和超时规则略微不同,本质没有大的区别。

注意:同上,这里的出队操作,指的是poll、take方法,而不是真正指的是出队操作,因为poll、take操作也可能会入队(队列针对的是操作,不是数据)。

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E take() throws InterruptedException {
    // 同步模式,会阻塞直到取到元素
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 有超时时间
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    // 立即返回,没取到元素返回null
    return xfer(null, false, NOW, 0);
}

通过上面一系列的方法,我们看到,其实poll、take内部调用的仍然是xfer 方法,因为是取数据,因此参数部分发生了变化,这个注意一下。

xfer()方法

private E xfer(E e, boolean haveData, int how, long nanos) {
     if (haveData && (e == null))
         throw new NullPointerException();
     Node s = null;                        // the node to append, if needed
     retry:
     for (;;) {                            // restart on append race
         //这里是取数据操作,那么遍历队列看看有没有匹配的操作(即放数据操作)
         for (Node h = head, p = h; p != null;) { // find & match first node
             boolean isData = p.isData;
             Object item = p.item; 
             if (item != p && (item != null) == isData) { // unmatched
                 if (isData == haveData)   // can't match
                     break;
                 /**
                 * 队列里面确实都是放数据的操作,则和当前操作是匹配的
                 * 设置匹配操作节点的item域为null (e为null,原本item 域是数据)
                 */
                 if (p.casItem(item, e)) { // match
                     // 协助推进head,这个和上面是一样的
                     for (Node q = p; q != h;) {
                         Node n = q.next;  // update by 2 unless singleton
                         if (head == h && casHead(h, n == null ? q : n)) {
                             h.forgetNext();
                             break;
                         }                 // advance and retry
                         if ((h = head)   == null ||
                             (q = h.next) == null || !q.isMatched())
                             break;        // unless slack < 2
                     }
                     // 唤醒阻塞线程(实际这里p.waiter是为null的,因为放数据操作是非阻塞的)
                     LockSupport.unpark(p.waiter);
                     // item线程是数据,本次操作是取数据操作,因此返回数据
                     return LinkedTransferQueue.<E>cast(item);
                 }
             }
             Node n = p.next;
             p = (p != n) ? n : (h = head); // Use head if p offlist
         }
         // 如果参数指定为NOW,那么就算没有被匹配,那么还是不入队,直接返回
         if (how != NOW) {                 // No matches available
             if (s == null)
                 s = new Node(e, haveData);
             // 添加节点    
             Node pred = tryAppend(s, haveData);
             if (pred == null)
                 continue retry;           // lost race vs opposite mode
             /**
             * 如果参数不是ASYC的这种,这可能需要阻塞等待
             * 取数据操作其参数都不是ASYNC,因此如果没有取到数据(被匹配),那么就可能进行阻塞等待
             */    
             if (how != ASYNC)
                 return awaitMatch(s, pred, e, (how == TIMED), nanos);
         }
         return e; // not waiting
     }
 }

在这里我们分析的是 取数据操作 ,因为有了前面 放数据操作 的分析,这里应该还是很好理解,取数据和放数据都是差不多的,都是和队列里面的操作进行匹配,如果队列里面的操作是取数据操作,本次操作是取数据操作,那么此时是不匹配的,需要把本次操作入队(参数:NOW、ASYNC、SYNC、TIMED 不一样),如果队列的操作都是放数据操作,本次操作是取数据操作,那么这个是匹配的,就把对头的数据取出来,返回即可。

下面我们来看看 awaitMatch 方法:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    // 如果是有超时的,计算其超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 当前线程
    Thread w = Thread.currentThread();
    // 自旋次数
    int spins = -1; // initialized after first item and cancel checks
    // 随机数,随机让一些自旋的线程让出CPU
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 如果s元素的值不等于e,说明它被匹配到了
        if (item != e) {                  // matched
            // assert item != s;
            // 把s的item更新为s本身
            // 并把s中的waiter置为空
            s.forgetContents();           // avoid garbage
            // 返回匹配到的元素
            return LinkedTransferQueue.<E>cast(item);
        }
        // 如果当前线程中断了,或者有超时的到期了
        // 就更新s的元素值指向s本身
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            // 尝试解除s与其前一个节点的关系
            // 也就是删除s节点
            unsplice(pred, s);
            // 返回元素的值本身,说明没匹配到
            return e;
        }
        
        // 如果自旋次数小于0,就计算自旋次数
        if (spins < 0) {                  // establish spins at/near front
            // spinsFor()计算自旋次数
            // 如果前面有节点未被匹配就返回0
            // 如果前面有节点且正在匹配中就返回一定的次数,等待
            if ((spins = spinsFor(pred, s.isData)) > 0)
                // 初始化随机数
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            // 还有自旋次数就减1
            --spins;
            // 并随机让出CPU
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            // 更新s的waiter为当前线程
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            // 如果有超时,计算超时时间,并阻塞一定时间
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            // 不是超时的,直接阻塞,等待被唤醒
            // 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
            LockSupport.park(this);
        }
    }
}

这里和 SynchronousQueue 的 awaitFulfill 差不多,主要进行了自旋,如果自旋后,仍然没有被匹配或者取消,则进行阻塞(如果设置了超时阻塞,则进行一段时间的阻塞),如果发生了中断异常,会取消该操作,改变item的值,匹配成功后也会更改item的值,因此如果item和原来的值不想等时,则说明发生了改变,返回即可。

在 awaitMatch 过程中,如果线程被中断了,或者超时了则会调用 unsplice() 方法去除该节点。

final void unsplice(Node pred, Node s) {
    //清除s的部分数据
    s.forgetContents(); // forget unneeded fields

    if (pred != null && pred != s && pred.next == s) {
        Node n = s.next;
        if (n == null ||
            (n != s && pred.casNext(s, n) && pred.isMatched())) {
            /**
            *这个for循环,用于推进head,如果head已经被匹配了,则需要更新head
            */
            for (;;) {               // check if at, or could be, head
                Node h = head;
                if (h == pred || h == s || h == null)
                    return;          // at head or list empty
                 //h 没有被匹配,跳出循环,否则可能需要更新head  
                if (!h.isMatched())
                    break;
                Node hn = h.next;
                //遍历结束了,退出循环
                if (hn == null)
                    return;          // now empty
                //head 被匹配了,重新设置head    
                if (hn != h && casHead(h, hn))
                    h.forgetNext();  // advance head
            }
            //s节点被移除后,需要记录删除的操作次数,如果超过阀值,则需要清理队列
            if (pred.next != pred && s.next != s) { // recheck if offlist
                for (;;) {           // sweep now if enough votes
                    int v = sweepVotes;
                    //没超过阀值,则递增记录值
                    if (v < SWEEP_THRESHOLD) {
                        if (casSweepVotes(v, v + 1))
                            break;
                    }
                    else if (casSweepVotes(v, 0)) {
                        //重新设置记录数,并清理队列
                        sweep();
                        break;
                    }
                }
            }
        }
    }
}
private void sweep() {
    for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
        if (!s.isMatched()) // s节点未被匹配,则继续向后遍历
            // Unmatched nodes are never self-linked
            p = s;
        else if ((n = s.next) == null) //s节点被匹配,但是是尾节点,则退出循环
            //s为尾结点,则可能其它线程刚好匹配完,所有这里不移除s,让其它匹配线程操作
            break;
        else if (s == n)    // stale s节点已经脱离了队列了,重头开始遍历
            // No need to also check for p == s, since that implies s == n
            p = head;
        else
            p.casNext(s, n); //移除s节点
    }
}

看看这个移除操作也挺复杂的,这里并没有简单的就将节点移除就ok,同时还检查了队列 head 的有效性,如果 head 被匹配了,则会推荐 head,保持队列 head 是有效的。如果移除节点的前驱节点也失效了,说明其它线程在操作,这里就不操作了,当移除了节点后,需要记录移除节点的操作次数 sweepVotes,如果这个值超过了阀值,则会对队列进行清理(移除那些失效的节点)。

移交元素的方法

请注意第二个参数,都是true,也就是这三个方法其实也是放元素的方法

// 立即转交一个元素给消费者,如果此时队列没有消费者,那就false
public boolean tryTransfer(E e) {
    // 立即返回
    return xfer(e, true, NOW, 0) == null;
}

// 转交一个元素给消费者,如果此时队列没有消费者,那就阻塞
public void transfer(E e) throws InterruptedException {
    // 同步模式
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 有超时时间
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

获取队列首个有效操作

在SynchronousQueue 中其队列是无法遍历的,而且也无法获取对头信息,但是在 LinkedTransferQueue 却不一样,LinkedTransferQueue 可以获取队头,也可以进行遍历 。

peek() 方法

public E peek() {
    return firstDataItem();
}
private E firstDataItem() {
    //遍历队列,查找第一个有效的操作节点
    for (Node p = head; p != null; p = succ(p)) {
        Object item = p.item;
        //如果该节点是数据节点,同时没有被取消,则返回数据
        if (p.isData) {
            if (item != null && item != p)
                return LinkedTransferQueue.<E>cast(item);
        }
        else if (item == null)// 非数据节点返回null,这里注意
            return null;
    }
    return null;
}
final Node succ(Node p) {  //如果节点p 失效则返回head,否则返回p的后继
    Node next = p.next;
    return (p == next) ? head : next;
}

这个 peek() 方法返回的是队列的第一个有效的节点,而这个节点可能是数据节点,也可能是取数据的操作节点,那么peek可能返回数据,也可能返回null,但是返回null,并不一定是队列为空,也可能是队列里面都是取数据的操作节点,这个需要注意一下。

总结

  1. LinkedTransferQueue 可以看在是 LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQeueue 三者的集合体;
  2. LinkedTransferQueue 的实现方式使用一种叫做 “双重队列” 的数据结构;
  3. 不管是取元素还是放元素都会入队;
  4. 先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;
  5. 如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;
  6. 至于是否入队及阻塞有四种方式:NOW、ASYNC、SYNC、TIMED;
  7. LinkedTransferQueue 全程都没有使用 synchronized、重入锁等比较中的锁,基本是通过 自旋 + CAS 实现;
  8. 对于入队后,先自旋一定次数后再调用 LockSupport.park() 或 LockSupport.parkNanos() 阻塞。

LinkedTransferQueue和SynchronousQueue(公平模式)区别

  • LinkedTransferQueue 和SynchronousQueue 其实基本是差不多的,两者都是无锁带阻塞功能的队列,都是使用的双重队列;
  • SynchronousQueue 通过内部类Transferer 来实现公平和非公平队列,在LinkedTransferQueue 中没有公平与非公平的区分;
  • LinkedTransferQueue 实现了TransferQueue接口,该接口定义的是带阻塞操作的操作,相比SynchronousQueue 中的Transferer 功能更丰富。
  • SynchronousQueue 中放数据操作和取数据操作都是阻塞的,当队列中的操作和本次操作不匹配时,线程会阻塞,直到匹配的操作到来。LinkedTransferQueue 是无界队列,放数据操作不会阻塞,取数据操作如果没有匹配操作可能会阻塞,通过参数决定是否阻塞(ASYNC,SYNC,NOW,TIMED)。

PS:以上代码提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

相关文章

网友评论

      本文标题:阻塞队列 — LinkedTransferQueue源码分析

      本文链接:https://www.haomeiwen.com/subject/nrsmiktx.html