美文网首页
二十 ConcurrentLinkedQueue&LinkedB

二十 ConcurrentLinkedQueue&LinkedB

作者: BeYearn | 来源:发表于2018-11-26 18:12 被阅读0次
图片.png

队列(Queue)是一种常用的数据结构,可以将队列看做是一种特殊的线性表,该结构遵循的先进先出原则(FIFO)
双向队列(Deque),是Queue的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll)(如果将Deque限制为只能从一端入队和出队,则可实现栈的数据结构(先进后出LIFO)) 常见集合中LinkedList就是个Deque
Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。

  1. 有时候我们把并发包下面的所有容器都习惯叫作并发容器,但是严格来讲:
  • 类似 ConcurrentLinkedQueue 这种“Concurrent*”容器,才是真正代表并发, Concurrent类型基于lock-free ,常见多线程访问场景可提供较高吞吐量(但Concurrent 往往提供了较低的遍历一致性也就是弱一致性 : 当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历; size()也不准确)
  • *****BlockingQueue内部基于锁, 并提供并提供了 BlockingQueue 的等待性方法。
  1. 队列有界性
  • ArrayBlockingQueue 是最典型的的有界队列,其内部以 final 的数组保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时,都要指定容量
  • LinkedBlockingQueue,容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。它的并发吞吐量相对ArrayBlockingQueue 要更好一些(下面原理)。
  • SynchronousQueue,奇葩的队列实现,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是 1 吗?其实不是的,其内部容量是 0。它是 Executors.newCachedThreadPool() 的默认队列, 这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
  • PriorityBlockingQueue 是无边界的优先队列,虽然严格意义上来讲,其大小总归是要受系统资源影响。
  • DelayedQueue 和 LinkedTransferQueue 同样是无边界的队列。对于无边界的队列,有一个自然的结果,就是 put 操作永远也不会发生其他 BlockingQueue 的那种等待情况。
  1. 两种实现原理
  • LinkedBlockingQueue 则改进了锁操作的粒度,头、尾操作使用不同的锁
    (不同于ArrayBlockingQueue的notEmpty、notFull 都是同一个再入锁的条件变量)
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

take的实现

public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
  • 类似 ConcurrentLinkedQueue 等,则是基于 CAS 的无锁技术,不需要在每个操作时使用锁,所以扩展性表现要更加优异。
  • 相对比较另类的 SynchronousQueue,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。
  1. queue的一个实例 Queue 被广泛使用在生产者 - 消费者场景,比如利用 BlockingQueue 来实现,由于其提供的等待机制,我们可以少操心很多协调工作
public class ConsumerProducer {
    public static final String EXIT_MSG  = "Good bye!";
    public static void main(String[] args) {
        // 使用较小的队列,以更好地在输出中展示其影响
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }


    static class Producer implements Runnable {
        private BlockingQueue<String> queue;
        public Producer(BlockingQueue<String> q) {
            this.queue = q;
        }

        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                try{
                    Thread.sleep(5L);
                    String msg = "Message" + i;
                    System.out.println("Produced new item: " + msg);
                    queue.put(msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            try {
                System.out.println("Time to say good bye!");
                queue.put(EXIT_MSG);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable{
        private BlockingQueue<String> queue;
        public Consumer(BlockingQueue<String> q){
            this.queue=q;
        }

        @Override
        public void run() {
            try{
                String msg;
                while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
                    System.out.println("Consumed item: " + msg);
                    Thread.sleep(10L);
                }
                System.out.println("Got exit message, bye!");
            }catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 使用的选择
  • 考虑应用场景中对队列边界的要求。ArrayBlockingQueue 是有明确的容量限制的,而 LinkedBlockingQueue 则取决于我们是否在创建时指定,SynchronousQueue 则干脆不能缓存任何元素。

  • 从空间利用角度,数组结构的 ArrayBlockingQueue 要比 LinkedBlockingQueue 紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存需求更大。

    • 通用场景中,LinkedBlockingQueue 的吞吐量一般优于 ArrayBlockingQueue,因为它实现了更加细粒度的锁操作。

    • ArrayBlockingQueue 实现比较简单,性能更好预测,属于表现稳定的“选手”。

    • 如果我们需要实现的是两个线程之间接力性(handoff),传递性的场景,你可能会选择 CountDownLatch,但是[SynchronousQueue]也是完美符合这种场景的,而且线程间协调和数据传输统一起来,代码更加规范(虽然功能上完全可以用ArrayBlockingQueue替换之,但是SynchronousQueue是轻量级的,其不包含任何容量, 效率最高的 )。很多时候 SynchronousQueue 的性能表现,往往大大超过其他实现,尤其是在队列元素较小的场景

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

        Thread putThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("put thread start");
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                }
                System.out.println("put thread end");
            }
        });

        Thread takeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("take thread start");
                try {
                    System.out.println("take from putThread: " + queue.take());
                } catch (InterruptedException e) {
                }
                System.out.println("take thread end");
            }
        });

        putThread.start();
        Thread.sleep(1000);
        takeThread.start();
    }
}
//put thread start
//take thread start
//take from putThread: 1
//take thread end
//put thread end

相关文章

  • 二十 ConcurrentLinkedQueue&LinkedB

    队列(Queue)是一种常用的数据结构,可以将队列看做是一种特殊的线性表,该结构遵循的先进先出原则(FIFO)双向...

  • 顺口溜(7)

    二十三 祭灶关 二十四 扫房子 二十五 磨豆腐 二十六 蒸馒头 二十七 杀公鸡 二十八 贴画画 二十九...

  • 我和你

    一二三四五六七八九十十一十二十三十四 十五十六十七十八十九二十二十一二十二 二十三二十四二十五二十六二十七二十八 ...

  • 年味“失味”,春节档票房“有味”

    二十三、祭灶官, 二十四、扫房子, 二十五、磨豆腐, 二十六、去割肉, 二十七、杀只鸡, 二十八、蒸枣花, 二十九...

  • 过年啦!大口大口得吃肉

    二十三,糖瓜儿粘; 二十四,扫房日; 二十五,炸豆腐; 二十六,炖白肉; 二十七,宰公鸡; 二十八,把面发; 二十...

  • 2018-05-22

    第十八,十九,二十,二十一,二十二,二十三,二十四,二十五,二十六次蛋糕,小饼干。 小饼干已经慢慢做出了心得,不要...

  • 小年

    “二十三,糖瓜粘;二十四,扫房子; 二十五,磨豆腐;二十六,炖羊肉; 二十七,宰公鸡;二十八,把面发; 二十九,蒸...

  • 腊月二十三的饼

    “二十三,放挂鞭。二十四,扫房子。 二十五,磨豆腐。二十六,去割肉。 二十七,杀小鸡。二十八,贴花花。 二十九,快...

  • 白雪却嫌春色晚,故穿庭院作飞花

    有民谣记曰:"二十三,祭灶官;二十四,扫房子;二十五,磨豆腐;二十六,去割肉;二十七,杀只鸡;二十八,蒸枣花;二十...

  • 勤劳动

    二十三,糖瓜粘;二十四,扫房子; 二十五,磨豆腐;二十六,炖羊肉; 二十七,宰公鸡;二十八,把面发; 二十九,蒸馒...

网友评论

      本文标题:二十 ConcurrentLinkedQueue&LinkedB

      本文链接:https://www.haomeiwen.com/subject/isfsqqtx.html