美文网首页
关于阻塞队列你知道多少

关于阻塞队列你知道多少

作者: bearPotMan | 来源:发表于2019-04-25 22:51 被阅读0次

引言:前面多线程文章中讲线程池的时候你应该注意到了阻塞队列,就是我们在 new ThreadPoolExecutor()传参的时候使用了 new LinkedBlockingQueue<Runnable>(4),那篇文章中并没有去过多的介绍 BlockQueue 接口相关的知识,那么这篇文章我们就来重点的介绍一下队列以及阻塞队列相关的知识。

队列

首先要说的就是 队列 这种数据结构它的特点,FIFO(先进先出),先进队的任务先出队,就比如平常我们去排队买早餐,谁先来谁就在前面谁先走,大家时间都着急你也别插队。这是严格意义上的队列解释,也就是说先出队的一定是先进队的,但是也有不严格的,比如 PriorityQueue,使用该队列的 pool 或 peek 方法的时候,返回的都是队列中最小的元素而不是先进入队列的元素,因为它对队列中的元素进行了排序,这个做一了解即可。Java集合框架 Collection 中给我们提供了一个接口 Queue(队列) ,即 Queue implements Collection,同时提供了几个基本的方法来操作队列,比如

  • add(E e):添加元素至队列尾部,如果队列已满 不会 抛出异常;
  • offer(E e):添加元素至队列尾部,如果队列已满 抛出异常;
  • E peek():返回队首元素,不会删除元素,如果队列为空 返回 null
  • E element():返回队首元素,不会删除元素,如果队列为空 抛出异常
  • E poll():返回并删除队首元素,如果队列为空 返回 null
  • E remove():返回并删除队首元素,如果队列为空 抛出异常

当然,大部分该接口的实现类都重写了上面的方法(下面会提到一些)。关于队列的基本知识我们就暂且说到这里,接下来就来说说今天的重点:阻塞队列。

阻塞队列

我们还是结合线程池来讲讲阻塞队列,上一讲我们说了常用的三种线程池创建的方式,也使用 ThreadPoolExecutor 去创建了线程池,为什么一般不建议使用下面这三种呢?本节我们就来一探究竟。

  • newFixedThreadPool(int nThreads):底层实现使用 new LinkedBlockingQueue<Runnable>()。
  • newSingleThreadExecutor():底层实现使用 new LinkedBlockingQueue<Runnable>()。
  • newCachedThreadPool():底层实现使用 new SynchronousQueue<Runnable>()。

分别解释一下这两种阻塞队列(顺带说一下 ArrayBlockingQueue):

  • LinkedBlockingQueue:一个基于链表实现的“有界”阻塞队列(但是这个界限是 Integer.MAX_VALUE = 2147483647,通常也就理解为无界);
public class QueueTest {
    public static void main(String[] args) {
        BlockingQueue linkedBlockingDeque = new LinkedBlockingDeque(3);
        try {
            linkedBlockingDeque.put("1");
            linkedBlockingDeque.put("2");
            linkedBlockingDeque.put("3");
            linkedBlockingDeque.put("4"); // 程序发生阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作会阻塞直到其他线程调用出队操作;
public class QueueTest {
    public static void main(String[] args) {
        BlockingQueue synchronousQueue = new SynchronousQueue();
        try {
            synchronousQueue.put("1");
            synchronousQueue.put("2"); // 程序发生阻塞
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • ArrayBlockingQueue:一个基于数组实现的有界阻塞队列。
public class QueueTest {
    public static void main(String[] args) {
        BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.add("1")); // true
        System.out.println(arrayBlockingQueue.add("2")); // true
        System.out.println(arrayBlockingQueue.add("3")); // true
        System.out.println(arrayBlockingQueue.add("4")); // 会抛出异常
        // 虽然 ArrayBlockingQueue 重写了add 方法,但是却调用了父类的 add 方法, 
        // 它的父类是AbstractQueue,AbstractQueue类实现了 add 方法,代码如下,比较容易理解

        System.out.println(arrayBlockingQueue.offer("4")); // false(队列已满返回 false)
    }
}
public boolean add(E e) {
    // offer 方法在队列满了的时候会返回false
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

对于api的使用这里就不做过多说明,相信大家都能看懂源代码的注释(因为我都能看懂),有兴趣可以自己写代码试试,看看具体的效果是什么样的。这里对一些方法做一个分类与小结:

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检索 element() peek()

抛出异常:
当阻塞队列 时,再往队列里 add 元素会抛出 IllegalStateException:Queue full 异常;
当阻塞队列 时,再从队列里 remove 元素会抛出 NoSuchElementException 异常。

特殊值:
插入方法,成功返回 true, 失败返回 false;
移除元素,成功返回被移除的元素,失败返回 null。

阻塞:
当阻塞队列 时,生产者线程继续往队列里 put 元素,队列会一直阻塞生产线程直到 put 数据或响应终端退出;
当阻塞队列 时,消费者线程试图从往队列里 take 元素,队列会一直阻塞消费者线程直到队列可用。

超时:
当阻塞队列 时,队列会阻塞生产者线程一段时间,超过指定的时间后生产者线程会退出。
当阻塞队列 时,队列会阻塞消费者线程一段时间,超过指定的时间后消费者线程会退出。

下面就来使用阻塞队列的知识写一个简版的生产者消费者demo,结合 SynchronousQueue 的特性,模拟生产者生产一个消息,消费者消费一个消息。
简单版

public class QueueTest {
    public static void main(String[] args) {
        BlockingQueue synchronousQueue = new SynchronousQueue();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " put 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName() + " put 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName() + " put 3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "aaa").start();

        new Thread(() -> {
            try {
                try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
                try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
                try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "bbb").start();
    }
}

改造升级版

public class ProducerAndConsumerTest {
    public static void main(String[] args) {
        ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer(new ArrayBlockingQueue<String>(10));
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 生产线程启动");
            producerAndConsumer.produce();
        }, "Prod").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 消费线程启动");
            System.out.println();
            System.out.println();
            producerAndConsumer.consume();
        }, "Consumer").start();

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println(Thread.currentThread().getName() + " 5s后 模拟故障叫停,结束生产与消费");
        producerAndConsumer.stop();
    }
}

class ProducerAndConsumer {
    private volatile boolean FLAG = true;
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<String> blockingQueue = null;

    public ProducerAndConsumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }
    // 生产消息
    public void produce() {
        String value;
        boolean result;
        while (FLAG) {
            value = atomicInteger.incrementAndGet() + "";
            try {
                result = blockingQueue.offer(value, 2L, TimeUnit.SECONDS);
                if (result) {
                    System.out.println(Thread.currentThread().getName() + " 生产 " + value + " 任务成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + " 生产 " + value + " 任务失败");
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + " 故障终止,FALG = false");
    }
    // 消费消息
    public void consume() {
        String result;
        while (FLAG) {
            try {
                result = blockingQueue.poll(2L, TimeUnit.SECONDS);
                if (null == result || "".equals(result)) {
                    FLAG = false;
                    System.out.println(Thread.currentThread().getName() + " 超过 2s 没有取到任务");
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " 消费 " + result + " 成功");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // 模拟故障
    public void stop() {
        this.FLAG = false;
    }
}

改造升级版的涉及到的知识还是很多的,如果有看前面几篇文章的话,相信也不难理解!
好了,本次分享到此结束!

相关文章

  • 关于阻塞队列你知道多少

    引言:前面多线程文章中讲线程池的时候你应该注意到了阻塞队列,就是我们在 new ThreadPoolExecuto...

  • 一切尽在代码中:一文阐述队列的使用JAVA

    阻塞队列如下: 关于java.util 中提供的队列的方法们: 阻塞队列包括了非阻塞队列中的大部分方法,如上五个方...

  • Java阻塞队列四组API介绍

    Java阻塞队列四组API介绍 通过前面几篇文章的学习,我们已经知道了Java中的队列分为阻塞队列和非阻塞队列以及...

  • 并发编程之并发队列

    常见的并发队列有2种:阻塞队列和非阻塞队列。阻塞队列使用锁实现,非阻塞队列使用CAS非阻塞算法实现。这2种队列都是...

  • 探讨阻塞队列和线程池源码

    阻塞队列 非阻塞队列是一个先进先出的单向队列(Queue),而BlockingQueue阻塞队列实际是非阻塞队列的...

  • 阻塞队列(一)(BlockingQueue)

    阻塞队列概要 阻塞队列与我们平常接触的普通队列(list)最大的不同点,在于阻塞队列支持阻塞添加和阻塞删除方法。 ...

  • Java多线程之阻塞队列

    一基本概念:1:什么叫阻塞队列阻塞队列都是相对于非阻塞队列而言的,非阻塞队列就是队列不会对当前线程产生阻塞;例如当...

  • 18.阻塞队列

    [TOC] 阻塞队列 阻塞队列首先是一种队列的数据结构,阻塞表现在此队列提供了操作数据的阻塞方法:阻塞队列提供了可...

  • 以LinkedBlockingQueue为例浅谈阻塞队列的实现

    目录 阻塞队列简介阻塞队列的定义Java中的阻塞队列 LinkedBlockingQueue单链表定义锁和等待队列...

  • Android中的线程与线程池

    阻塞队列BlockingQueue 阻塞队列常用于生产者——消费者模型,生产者往阻塞队列插入数据,消费者往阻塞队列...

网友评论

      本文标题:关于阻塞队列你知道多少

      本文链接:https://www.haomeiwen.com/subject/znxogqtx.html