美文网首页
并发编程基础小结-生产者-消费者队列(BlockingQueue

并发编程基础小结-生产者-消费者队列(BlockingQueue

作者: r09er | 来源:发表于2018-01-04 16:46 被阅读49次

    wait()和notifyAll()方法以非常低级方式的解决任务互相操作的问题.每次交互时都握手,在更多的情况下,可以使用同步队列来解决协作问题.

    同步队列(BlockingQueue)

    同步队列在任何时刻都只允许一个任务插入或者移除元素.在java.util.concurrent.BlockingQueue提供了这个队列,这个接口有各种标准实现,主要使用有以下三种

    • LinkedBlockingQueue,它是一个阻塞的链队列.通常都可以选用(Java内置的多种线程池都使用了LinkedBlockingQueue)
    • ArrayBlockQueue,具有固定的尺寸,可以在被阻塞之前,向其中添加有限数量的元素.
    • SynchronousQueue内部不维护容器,每一次put都会阻塞直到take取出.

    LinkedBlockingQueue和ArrayBlockQueue插入和移除使用ReentrantLock,保证不会被多个线程同时操作.而SynchronousQueue使用的是CAS
    SynchronousQueue原理

    示例

    有一个生产的线程和一个消费线程,消费线程会一直等待直到有生产线程生产了一个数据.
    示例中生产线程生产一个英文字母放到队列中,消费线程在控制台输出字母

    生产者

    public class SenderQueue implements Runnable{
    
        private BlockingQueue<Character> queue;
    
        private Random random = new Random(47);
    
        public SenderQueue(BlockingQueue<Character> queue) {
            this.queue = queue;
        }
        @Override
        public void run() {
            try {
                System.out.println("开始发送");
                while (true){
                for(char x='A';x<'z';x++){
                        queue.put(x);
                        //模拟耗时操作,睡眠一段时间让消费者等待
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
    
                }
            } } catch (InterruptedException e) {
                System.out.println("发送中断..");
            }
            System.out.println("结束发送");
        }
    }
    

    消费者

    public class ReceiverQueue implements Runnable {
    
        private BlockingQueue queue;
    
        public ReceiverQueue(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("开始接收");
                while (true) {
    
                    Character c = (Character) queue.take();
                    System.out.println(c + ", ");
    
                }
            } catch (InterruptedException e) {
                System.out.println("接收中断..");
            }
            System.out.println("结束接收");
        }
    }
    
    

    测试类

     public static void main(String[] args) throws InterruptedException {
            BlockingQueue<Character> receiveQueue = new LinkedBlockingQueue<>();
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(new SenderQueue(receiveQueue));
            service.execute(new ReceiverQueue(receiveQueue));
            TimeUnit.SECONDS.sleep(5);
            service.shutdownNow();
        }
    

    输出

    开始发送
    开始接收
    A, 
    B, 
    C, 
    D, 
    E, 
    接收中断..
    结束接收
    发送中断..
    结束发送
    

    注意每次输出都不一定相同

    相关文章

      网友评论

          本文标题:并发编程基础小结-生产者-消费者队列(BlockingQueue

          本文链接:https://www.haomeiwen.com/subject/atvrnxtx.html