美文网首页
并发编程-BlockQueue线程容器

并发编程-BlockQueue线程容器

作者: slientopen | 来源:发表于2018-01-23 16:12 被阅读0次

    概述

    blockQueue 作为线程容器、阻塞队列,多用于生产者、消费者的关系模式中,保障并发编程线程同步,线程池中被用于当作存储任务的队列,还可以保证线程执行的有序性。

    常用方法

    生产
    • add(obj):往队列里面增加一个对象,如果队列没有空间抛出异常,反之返回true。
    • offer(obj): 往队列增加一个对象,返回true/false
    • put(obj): 往队列增加一个对象,如果没有空间,则会阻塞改线程,直到有空间.
    消费
    • poll(time):取出排在首位的对象,如果在一定时间内没有返回,则会返回null
    • take():取出排在首位的对象,如果队列中没有数据,则会阻塞该线程直到有数据。
    查询
    • contains(obj):查询是否存在某个元素,返回true/false
    • peek():返回队列头部的元素,无则返回null

    特点

    • 容量有限,可以限定队列的长度,如果没有主动显示队列长度的情况下,默认长度为Integer.MAX_VALUE
    • 内存一致性,遵循happend-before原则,即写操作总是先于后面的读操作。参考资料 happend-before
    • 因为其继承Collection接口,所以可以使用集合的接口,但某些接口并不保证立即执行,因为其内部维护着内部锁(ReentrantLock),所以只有在获取锁的情况下才会执行对应的代码,以remove()源码为例:
    public boolean remove(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    do {
                        if (o.equals(items[i])) {
                            removeAt(i);
                            return true;
                        }
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    

    每次操作都会去获取锁,如果锁被其他操作暂用,没有获取到锁,则只能去排队,所以上面代码并不会立即执行。

    常用分类

    前言

    创建队列时,可以添加fair参数,用于声明内部锁是否是公平锁,公平锁用于决定队列里面的任务是否会按照顺讯执行。

    公平锁:

    显式声明为公平锁的任务执行完全按照队列的顺序执行,新的任务进来会存放在队尾。

    非公平锁:

    队列里面的任务可以按照顺序执行,但是新的任务可能会与队列争抢CPU资源,不保证队列外的顺序。

    • ArrayBlockingQueue,创建固定大小的队列,内部维护一个数组,遵循FIFO原则
    • LinkedBlockingQueue,可以自定义队列长度,无指定的情况下默认为Integer.MAX_VALUE,内部维护着一个链表,遵循着FIFO原则
    • PriorityBlockingQueue,类似ArrayBlockingQueue,内部维护一个数组,但并不按照FIFO原则,其内部有个compare属性决定队列任务的执行顺序。
    • SynchronousQueue,特殊的队列,内部无存储空间维护队列,只有当生产者和消费者同时存在时,才会执行,类似与管道。

    例子

    • 生产者与消费者案例,一个生产者和多个消费者。
    public class BlockQueueDemo {
    
    
        /**
         * 生产者
         */
        static class Productor implements Runnable{
    
            private BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
    
            Productor(BlockingQueue blockingQueue){
                this.blockingQueue = blockingQueue;
            }
    
            @Override
            public void run() {
                for(int i=0;i<100;i++){
                    try {
                        Thread.sleep(200);
                        blockingQueue.put(i);
                        System.out.println("生产者产品了产品"+i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 消费者
         */
        static class Consumer implements Runnable{
    
            public BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
    
            Consumer(BlockingQueue blockingQueue){
                this.blockingQueue = blockingQueue;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        String name = Thread.currentThread().getName();
                        Integer queueData = blockingQueue.take();
                        System.out.println("消费者"+name+"消费了产品"+queueData);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 一个生产者对应多个消费者,采用BlockQueue作为缓冲区
         * @param args
         */
        public static void main(String[] args) {
            BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
            Productor productor = new Productor(blockingQueue);
            Consumer consumer = new Consumer(blockingQueue);
            Consumer consumer2 = new Consumer(blockingQueue);
            new Thread(productor).start();
            new Thread(consumer).start();
            new Thread(consumer2).start();
        }
    }
    执行结果:
    消费者Thread-2消费了产品21
    生产者产品了产品22
    消费者Thread-1消费了产品22
    生产者产品了产品23
    消费者Thread-2消费了产品23
    生产者产品了产品24
    消费者Thread-1消费了产品24
    生产者产品了产品25
    消费者Thread-2消费了产品25
    生产者产品了产品26
    消费者Thread-1消费了产品26
    生产者产品了产品27
    消费者Thread-2消费了产品27
    

    下一篇:并发编程-锁

    相关文章

      网友评论

          本文标题:并发编程-BlockQueue线程容器

          本文链接:https://www.haomeiwen.com/subject/vcilaxtx.html