美文网首页
java.util.concurrent包学习之阻塞队列

java.util.concurrent包学习之阻塞队列

作者: 魂之挽歌w | 来源:发表于2019-09-28 22:45 被阅读0次

前言:

  阻塞队列是jdk5 java.util.concurrent包中五个模块之一,一般用于消费者、生产者的情景,与一般队列的不同就是阻塞队列内部加入了Lock和Condition可以在某个线程试图向队列添加元素而该队列已经满时,或者从队列移出元素而队列为空时,自动阻塞该线程。

阻塞队列的作用和使用场景

  所以在协调多个线程之间的合作时,阻塞队列是一个有用的工具。生产线程可以周期性地将中间结果存储在阻塞队列中,而消费线程移除中间结果并加以逻辑处理。阻塞队列最大的作用就是可以自动平衡负载,因为如果生产者线程集运行得比消费者线程集慢,消费者线程就会发生阻塞,反之也是一样。假设一个场景:我们写一个监控文件变化的程序,开一个线程获取变化了的文件名,拿到之后我们会做解析,查找数据库等等其他操作。如果我们对这两个操作分开统计它们的处理时间,会发现后面的解析需要的时间远远大于前面得到文件名的时间。如果这个线程每一秒轮询一次,而解析查询操作有可能会超出一秒,这样就会让大量的变化文件来不及处理。这个使用就需要用到阻塞线程了,可以使用生产线程来进行文件变化检测,而多个消费线程来进行文件处理操作。

JDK 7 提供了7个阻塞队列

**1、ArrayBlockingQueue **数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。

2、LinkedBlockingQueue一个由链表结构组成的有界阻塞队列
此队列按照先出先进的原则对元素进行排序

3、PriorityBlockingQueue 支持优先级的无界阻塞队列

4、DelayQueue 支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素

5、SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。

6、LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法

transfer方法
  如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。

tryTransfer方法
  用来试探生产者传入的元素能否直接传给消费者。,如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。

7、LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。

阻塞队列的方法

方法 正常动作 特殊情况下动作
add 添加一个元素 如果队列满,则抛出IllegalStateException异常
offer 添加一个元素并返回bollean 如果队列满,返回false
put 添加一个元素 如果队列满,则阻塞该线程
element 返回队列的头元素 如果队列为空,则抛出NoSuchElementException
peek 返回队列的头元素 如果队列为空,则返回null
poll 移除并返回队列的头元素 如果队列为空,返回null
remove 移除并返回队列的头元素 如果队列为空,则抛出NoSuchElementException
take 移除并返回队列的头元素 如果队列为空 ,则阻塞该线程

阻塞队列测试使用

public class BlockQueueTest {

    private static BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);

    /**
     * 消费者线程,每一秒消费一次
     */
  static   class Consumer implements  Runnable{

        BlockingQueue queue;
        public  Consumer(BlockingQueue queue){
            this.queue = queue;
        }
        public void run() {
            System.out.println("开始消费----");
            while (true){
                try {
                    Object poll = queue.take();
                    Thread.sleep(500);
                    System.out.println(Thread.currentThread().getName()+"消费"+(String)poll);
                } catch (InterruptedException e) {
                    //被中断,捕获后清除中断状态
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

   static class Producer implements Runnable{

        BlockingQueue queue;

        Producer(BlockingQueue queue){
            this.queue = queue;
        }

        public void run() {
            System.out.println("开始生产-------");
            while (true){
                try {
                    queue.put("test");
                    System.out.println("生产线程生产test");
                } catch (InterruptedException e) {
                    //捕获中断,清除中断状态
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    public static void main(String[] args) {
        //开启生产线程
        new Thread(new Producer(blockingQueue)).start();
        //开启三个消费线程
        new Thread(new Consumer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }

参考

  • 《Java核心技术》

相关文章

网友评论

      本文标题:java.util.concurrent包学习之阻塞队列

      本文链接:https://www.haomeiwen.com/subject/rctpuctx.html