美文网首页
JUC-阻塞队列

JUC-阻塞队列

作者: GIT提交不上 | 来源:发表于2020-03-05 20:33 被阅读0次

      阻塞队列数据结构示意图如下所示:

    • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞;
    • 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
    图1-1 阻塞队列.png

      阻塞:在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。什么时候阻塞/什么时候唤醒,阻塞队列包办。

    图1-2 阻塞队列接口实现类.png

      BlockingQueue常见API如下所示:


    BlockingQueue常见API.png

      SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须要等待take操作,反之同理。(生产一个消费一个)

    /**
     * @author luffy
     **/
    public class BlockQueueDemo {
        public static void main(String[] args){
          BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
    
          new Thread(()->{
              try {
                  System.out.println(Thread.currentThread().getName()+":PUT AAA");
                  blockingQueue.put("AAA");
                  System.out.println(Thread.currentThread().getName()+":PUT BBB");
                  blockingQueue.put("BBB");
                  System.out.println(Thread.currentThread().getName()+":PUT CCC");
                  blockingQueue.put("CCC");
    
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          },"AAA").start();
    
    
          new Thread(()->{
              try {
                  try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
                  System.out.println(Thread.currentThread().getName()+blockingQueue.take());
    
                  try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
                  System.out.println(Thread.currentThread().getName()+blockingQueue.take());
    
                  try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
                  System.out.println(Thread.currentThread().getName()+blockingQueue.take());
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          },"BBB").start();
    
        }
    }
    

      防止虚拟唤醒机制:while,生产者消费者模型Lock实现:

    /**
     * @author luffy
     **/
    public class BlockQueueDemo {
        public static void main(String[] args){
            ShareData data = new ShareData();
            for(int i =0 ;i< 10;i++){
                new Thread(()->{
                    try {
                        data.put();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },String.valueOf(i)).start();
            }
    
            for(int i =10 ;i< 20;i++){
                new Thread(()->{
                    try {
                        data.take();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },String.valueOf(i)).start();
            }
        }
    }
    
    class ShareData{
        private int num = 0;
        private final static int MAX = 10;
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
    
        public void put()throws Exception{
            lock.lock();
            try {
                while (num == MAX){
                    condition.await();
                }
                num++;
                System.out.println(Thread.currentThread().getName()+":put"+num);
                condition.signalAll();
            }finally {
                lock.unlock();
            }
        }
    
        public void take() throws Exception{
            lock.lock();
            try {
                while (num == 0){
                    condition.await();
                }
                num--;
                System.out.println(Thread.currentThread().getName()+":take"+num);
                condition.signalAll();
            }finally {
                lock.unlock();
            }
        }
    }
    

      生产者消费者模型阻塞队列实现:

    /**
     * @author luffy
     **/
    public class BlockQueueDemo {
        public static void main(String[] args){
            ShareData shareData = new ShareData(new ArrayBlockingQueue<String>(3));
            new Thread(()->{
                try {
                    shareData.put();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },"Put").start();
    
    
            new Thread(()->{
                try {
                    shareData.take();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },"Take").start();
    
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            shareData.flagSet();
    
        }
    }
    
    class ShareData{
        private volatile boolean flag = true;
        private BlockingQueue<String> blockingQueue = null;
        private AtomicInteger atomicInteger = new AtomicInteger();
        public ShareData(BlockingQueue blockingQueue) {
            this.blockingQueue = blockingQueue;
            System.out.println(blockingQueue.getClass().getName());
        }
    
        public void put() throws Exception{
            while (flag){
                String value = atomicInteger.incrementAndGet()+"";
                boolean resFlag = blockingQueue.offer(value,2,TimeUnit.SECONDS);
                if(resFlag){
                    System.out.println(Thread.currentThread().getName()+"已生产!");
                }else{
                    System.out.println(Thread.currentThread().getName()+"生产失败!");
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("被叫停,生产结束!");
        }
    
    
        public void take() throws Exception{
            while (flag){
                atomicInteger.decrementAndGet();
                String res = blockingQueue.poll(2,TimeUnit.SECONDS);
                if(res == null || "".equals(res)){
                    this.flag =false;
                    System.out.println(Thread.currentThread().getName()+"消费失败!");
                    return;
                }else{
                    System.out.println(Thread.currentThread().getName()+"消费成功!");
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void flagSet(){
            this.flag = false;
        }
    }
    

    相关文章

      网友评论

          本文标题:JUC-阻塞队列

          本文链接:https://www.haomeiwen.com/subject/pjbtrhtx.html