美文网首页
并发容器之BlockingQueue(阻塞队列)

并发容器之BlockingQueue(阻塞队列)

作者: 盼旺 | 来源:发表于2019-09-14 10:34 被阅读0次

简介:

阻塞队列BlockingQueue被广泛使用在生产者-消费者问题中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

基本操作

BlockingQueue继承于Queue接口
有Queue得基本操作

  • 插入元素
add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
offer(E e):当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时不会抛出异常;
  • 删除元素
remove(Object o):从队列中删除数据,成功则返回true,否则为false
poll:删除数据,当队列为空时,返回null;
  • 查看元素
element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常

BlockingQueue具有的特殊操作

  • 插入数据
put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用;
offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,
直至阻塞队列已经有空余的地方,与put方法不同的是,
该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;。
  • 删除数据
take():当阻塞队列为空时,获取队头数据的线程会被阻塞;
poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞
另外,如果被阻塞的线程超过了给定的时长,该线程会退出

常用的BlockingQueue

实现BlockingQueue接口的有
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
下面对这几种常见的阻塞队列进行学习:

ArrayBlockingQueue

ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改
ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的ArrayBlockingQueue,可采用如下代码

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);

ArrayBlockingQueue的添加数据方法
add,put,offer这3个方法,总结如下:
add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true
offer方法如果队列满了,返回false,否则返回true
add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。
这3个方法内部都会使用可重入锁保证原子性。

ArrayBlockingQueue的删除数据方法
poll,take,remove这3个方法,总结如下:
poll方法对于队列为空的情况,返回null,否则返回队列头部元素。
remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。
poll方法和remove方法不会阻塞线程。
take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。
这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

例子
BlockingQueueExample类在不同的线程中启动生产者和消费者。生产者将字符串插入共享BlockingQueue中,消费者将它们取出。

package wg;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        Producer producer = new Producer(queue);//生产者
        Consumer consumer = new Consumer(queue);//消费者
        new Thread(producer).start();
        new Thread(consumer).start();
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*********************************************************************/
package wg;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
    protected BlockingQueue queue = null;
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/**********************************************************************/
package wg;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
    protected BlockingQueue queue = null;
    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/*结果
1
2
3
*/

LinkedBlockingQueue

LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。
内部使用放锁和拿锁,这两个锁实现阻塞
为了防止LinkedBlockingQueue容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE(2的32次方-1)

LinkedBlockingQueue的添加数据方法
add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。
ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。
而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。

LinkedBlockingQueue的删除数据方法
LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。
需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。

一个LinkedBlockingQueue线程安全的例子

package wg;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
    public static void main(String[] args) throws Exception {
        final Queue<Integer> q1 = new LinkedBlockingQueue<>();
        final Queue<Integer> q2 = new LinkedBlockingQueue<>();
        final int n = 1000000;
        final int m = 100;
        for (int i = 0; i < n; i++) {
            q1.add(i);
        }
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < m; i++) {
            ts.add(new Thread(new Runnable() {
                public void run() {
                    int i = 0;
                    while (q2.size() < n && i++ < n / m) { // q2.size() 非线程安全,所以设置每个线程添加平均个数,防止poll出null报错
                        q2.add(q1.poll());
                    }
                }
            }));
        }
        for (Thread t : ts) {
            t.start();
        }
        System.out.println("启动了 " + m + " 个线程,每个线程处理 " + n / m + " 个操作");

        for (Thread t : ts) {
            while (t.isAlive()) {
                Thread.sleep(1);
            }
        }
        System.out.println("q1.size():" + q1.size());
        System.out.println("q2.size():" + q2.size());
        Set<Integer> set = new HashSet<>();
        Integer i;
        while ((i = q2.poll()) != null) {
            set.add(i);
        }
        System.out.println("q2.size():" + q2.size());
        System.out.println("set.size():" + set.size());
    }
}
/*
启动了 100 个线程,每个线程处理 10000 个操作
q1.size():0
q2.size():1000000
q2.size():0
set.size():1000000
*/

SynchronousQueue

SynchronousQueue是这样一种阻塞队列,其中每个put/offer(插入)必须等待一个take(移除),反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代
它是线程安全的,是阻塞的
此队列不允许 null 元素。
它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
peek() 永远返回null
isEmpty()永远是true
put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null

它模拟的功能类似于生活中一手交钱一手交货这种情形,像那种货到付款或者先付款后发货模型不适合使用SynchronousQueue

  • 下面使用SynchronousQueue模拟只能生产一个产品的生产者-消费者模型。
package wg;

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueTest {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

        new Product(queue).start();
        new Customer(queue).start();
    }
    static class Product extends Thread{
        SynchronousQueue<Integer> queue;
        public Product(SynchronousQueue<Integer> queue){
            this.queue = queue;
        }
        @Override
        public void run(){
            while(true){
                int rand = new Random().nextInt(1000);
                System.out.println("生产了一个产品:"+rand);
                System.out.println("等待三秒后运送出去...");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    queue.put(rand);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(queue.isEmpty());
            }
        }
    }
    static class Customer extends Thread{
        SynchronousQueue<Integer> queue;
        public Customer(SynchronousQueue<Integer> queue){
            this.queue = queue;
        }
        @Override
        public void run(){
            while(true){
                try {
                    System.out.println("消费了一个产品:"+queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("------------------------------------------");
            }
        }
    }
}

参考文章https://juejin.im/post/5aeebd02518825672f19c546

相关文章

网友评论

      本文标题:并发容器之BlockingQueue(阻塞队列)

      本文链接:https://www.haomeiwen.com/subject/exmzectx.html