美文网首页
Java之BlockingQueue

Java之BlockingQueue

作者: yellow_han | 来源:发表于2018-11-20 22:51 被阅读0次

    1、核心方法

    public interface BlockingQueue<E> extends Queue<E> {
    
        //将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
        boolean add(E e);
    
        //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
        boolean offer(E e);
    
        //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
        void put(E e) throws InterruptedException;
    
        //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
        E take() throws InterruptedException;
    
        //在给定的时间里,从队列中获取值,时间到了直接调用普通的poll方法,为null则直接返回null。
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //获取队列中剩余的空间。
        int remainingCapacity();
    
        //从队列中移除指定的值。
        boolean remove(Object o);
    
        //判断队列中是否拥有该值。
        public boolean contains(Object o);
    
        //将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c);
    
        //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    2、阻塞队列的成员

    image.png

    3、成员详解

    ArrayBlockingQueue

           基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
      ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

    LinkedBlockingQueue

           基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
    作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

    DelayQueue

           DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
    使用场景:
           DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

    PriorityBlockingQueue

           基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

    SynchronousQueue

           一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
      声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
      如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
      但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

    4、秒杀demo

    秒杀对列

    package com.hsshy.beam.queue.jvm;
    
    
    import com.hsshy.beam.queue.entity.SuccessKilled;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * 秒杀队列(固定长度为100)
     * @author 科帮网 By https://blog.52itstyle.com
     * 创建时间   2018年5月10日
     */
    public class SeckillQueue {
         //队列大小
        static final int QUEUE_MAX_SIZE   = 100;
        /** 用于多线程间下单的队列 */
        static BlockingQueue<SuccessKilled> blockingQueue = new LinkedBlockingQueue<SuccessKilled>(QUEUE_MAX_SIZE);
        
        /**
         * 私有的默认构造子,保证外界无法直接实例化
         */
        private SeckillQueue(){};
        /**
         * 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例
         * 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载
         */
        private static class SingletonHolder{
            /**
             * 静态初始化器,由JVM来保证线程安全
             */
            private  static SeckillQueue queue = new SeckillQueue();
        }
        //单例队列
        public static SeckillQueue getMailQueue(){
            return SingletonHolder.queue;
        }
        /**
         * 生产入队
         * @param kill
         * @throws InterruptedException
         * add(e) 队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue 
         * put(e) 队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
         * offer(e) 队列未满时,返回true;队列满时返回false。非阻塞立即返回。
         * offer(e, time, unit) 设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。 
         */
        public  Boolean  produce(SuccessKilled kill) throws InterruptedException {
            return blockingQueue.offer(kill);
        }
        /**
         * 消费出队
         * poll() 获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null
         * take() 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒
         */
        public  SuccessKilled consume() throws InterruptedException {
            return blockingQueue.take();
        }
        // 获取队列大小
        public int size() {
            return blockingQueue.size();
        }
    }
    

    springboot启动时执行类

    package com.hsshy.beam.queue.jvm;
    
    import com.hsshy.beam.queue.entity.SuccessKilled;
    import com.hsshy.beam.queue.service.ISeckillService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费秒杀队列
     * 创建者 科帮网
     * 创建时间 2018年4月3日
     */
    @Component
    public class TaskRunner implements ApplicationRunner {
        
        @Autowired
        private ISeckillService seckillService;
        
        @Override
        public void run(ApplicationArguments var) throws Exception{
            while(true){
                //进程内队列
                SuccessKilled kill = SeckillQueue.getMailQueue().consume();
                if(kill!=null){
                    seckillService.startSeckil(kill.getId(), kill.getUserId());
                }
            }
        }
    }
    

    controller层

     @ApiOperation(value="秒杀柒(进程内队列)",nickname="科帮网")
        @PostMapping("/startQueue")
        public R startQueue(long seckillId){
            seckillService.deleteSeckill(seckillId);
            final long killId =  seckillId;
            LOGGER.info("开始秒杀柒(正常)");
            for(int i=0;i<1000;i++){
                final long userId = i;
                Runnable task = new Runnable() {
                    @Override
                    public void run() {
                        SuccessKilled kill = new SuccessKilled();
                        kill.setId(killId);
                        kill.setUserId(userId);
                        try {
                            Boolean flag = SeckillQueue.getMailQueue().produce(kill);
                            if(flag){
                                LOGGER.info("用户:{}{}",kill.getUserId(),"秒杀成功");
                            }else{
                                LOGGER.info("用户:{}{}",userId,"秒杀失败");
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            LOGGER.info("用户:{}{}",userId,"秒杀失败");
                        }
                    }
                };
                executor.execute(task);
            }
            try {
                Thread.sleep(10000);
                Long  seckillCount = seckillService.getSeckillCount(seckillId);
                LOGGER.info("一共秒杀出{}件商品",seckillCount);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return R.ok();
        }
    

    5、延时队列demo

    消息体

    package com.hsshy.beam.queue.delay;
      
    import java.util.concurrent.Delayed;  
    import java.util.concurrent.TimeUnit;  
      
    /** 
     * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期…… 
     *  
     */  
    public class Message implements Delayed {  
        private int id;  
        private String body; // 消息内容  
        private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。  
      
        public int getId() {  
            return id;  
        }  
      
        public String getBody() {  
            return body;  
        }  
      
        public long getExcuteTime() {  
            return excuteTime;  
        }  
      
        public Message(int id, String body, long delayTime) {  
            this.id = id;  
            this.body = body;  
            this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();  
        }  
      
        // 自定义实现比较方法返回 1 0 -1三个参数  
        @Override  
        public int compareTo(Delayed delayed) {  
            Message msg = (Message) delayed;  
            return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1  
                    : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);  
        }  
      
        // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期  
        @Override  
        public long getDelay(TimeUnit unit) {  
            return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);  
        }  
    }
    

    消费者

    package com.hsshy.beam.queue.delay;
      
    import java.util.concurrent.DelayQueue;  
      
    public class Consumer implements Runnable {  
        // 延时队列 ,消费者从其中获取消息进行消费  
        private DelayQueue<Message> queue;  
      
        public Consumer(DelayQueue<Message> queue) {  
            this.queue = queue;  
        }  
      
        @Override  
        public void run() {  
            while (true) {  
                try {  
                    Message take = queue.take();  
                    System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }
    

    测试类

    package com.hsshy.beam.queue.delay;
      
    import java.util.concurrent.DelayQueue;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors;  
      
    public class DelayQueueTest {  
         public static void main(String[] args) {    
                // 创建延时队列    
                DelayQueue<Message> queue = new DelayQueue<Message>();    
                // 添加延时消息,m1 延时3s    
                Message m1 = new Message(1, "world", 3000);    
                // 添加延时消息,m2 延时10s    
                Message m2 = new Message(2, "hello", 10000);    
                //将延时消息放到延时队列中  
                queue.offer(m2);    
                queue.offer(m1);    
                // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间   
                ExecutorService exec = Executors.newFixedThreadPool(1);  
                exec.execute(new Consumer(queue));  
                exec.shutdown();  
            }    
    }
    

    参考链接:

    https://www.cnblogs.com/KingIceMou/p/8075343.html
    http://www.cnblogs.com/WangHaiMing/p/8798709.html
    https://gitee.com/52itstyle/spring-boot-seckill

    加入Java互助群

    image.png

    相关文章

      网友评论

          本文标题:Java之BlockingQueue

          本文链接:https://www.haomeiwen.com/subject/akufqqtx.html