美文网首页
08.并发编程之Queue介绍

08.并发编程之Queue介绍

作者: commence | 来源:发表于2017-05-12 00:54 被阅读102次

    一、技术选型

    MQ适用于消息堆积,消费端处理不过来;两点之间运行生命周期不同情况;
    JMS支持5种消息类型,为什么不能用队列来替代呢?
    原因:解耦、针对消息本身做队列操作、消息持久化
    PS:如果消费很快,前端积压严重的话,不建议使用mq,可采用直连的TCP通信,如mina、netty,节省中间mq转发。

    二、常用语法

    add 增加一个元索,如果队列已满,则抛出一IIIegaISlabEepeplian异常
    remove 移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
    element 返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException异常
    offer 添加一个元素并返回true,如果队列已满,则返回false
    poll 移除并返问队列头部的元素,如果队列为空,则返回null
    peek 返回队列头部的元素,如果队列为空,则返回null
    put 添加一个元素,如果队列满,则阻塞
    take 移除并返回队列头部的元素,如果队列为空,则阻塞
    removeelementofferpollpeek 其实是属于Queue接口。
    阻塞队列的操作可以根据它们的响应方式分为以下三类:
    (1) addremoveelement操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。
    (2) 在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offerpollpeek方法。这些方法在无法完成任务时只是给出一个出错示而不会抛出异常。
    注意:pollpeek方法出错进返回null。因此,向队列中插入null值是不合法的。
    (3) put:把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断,直接有空间再继续。take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。

    三、并发Queue

    在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,都继承自Queue。

    1、ConcurrentLinkedQueue

    是一个适用高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,性能好于BlockingQueue,是一个基于链接节点的无界线程安全队列。
    该队列的元素先进先出,头是先进的,尾是最近加入的,队列不允许为Null。
    重要方法:
    add()offer()都是加入元素的方法,无任何区别。
    poll()peek()都是取头元素节点,前者会删除元素,后者不会。

    2、ArrayBlockingQueue

    基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护一个定长数组,以便缓存队列中的数据对象,内部没有实现读写分离,意味着生产者和消费者不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列。

    /**有界队列**/
    package demo3;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class UserQueue {
        public static void main(String[] args) throws InterruptedException {
    
            ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);  // 必须传一个长度
    
            array.put("a");
            array.put("b");
            array.add("c");
            array.add("d");
            array.add("e");
            array.add("f");
            //System.out.println(array.offer("a", 3, TimeUnit.SECONDS));// 阻塞式
        }
    }
    

    3、LinkedBlockingQueue

    基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列是一个链表构成),LinkedBlockingQueue之所以能够高效地处理并发数据,是因为内部实现采用了读写分离锁,从而实现生产者和消费者操作的完全并行,是一个无界队列。

    /**无界队列**/
            LinkedBlockingQueue<String> link = new LinkedBlockingQueue<String>();   // 可以不传,也可以传,传则定死;不传则无界
    
            link.put("a");
            link.put("b");
            link.add("c");
            link.add("d");
            link.add("e");
            link.add("f");
            for (Iterator iterator = link.iterator(); iterator.hasNext();) {
                String string = (String) iterator.next();
                System.out.println(string);
            }
            //但如果初始化长度的话,也会有容量限制
    

    4、SynchronousQueue

    一种没有缓冲的队列,生产者生产的数据直接会被消费者获取并消费。

    /**无缓存队列**/
            SynchronousQueue<String> queue = new SynchronousQueue<String>();
            queue.add("a");
    
    /**由于没有任何容量,直接抛异常**/
            Exception in thread "main" java.lang.IllegalStateException: Queue full
            at java.util.AbstractQueue.add(AbstractQueue.java:98)
            at demo3.UserQueue.main(UserQueue.java:31)
    
    /**可以调用add方法,但并不代表队列中加元素了,先take再add,直接扔给前面阻塞在take的线程**/
    final SynchronousQueue<String> q = new SynchronousQueue<String>();
            Thread t1 = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        System.out.println(q.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
            });
            t1.start();
            Thread t2 = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    q.add("adsfa");
    
                }
    
            });
            t2.start();
    

    5、PriorityBlockingQueue

    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparator对象来决定,也就是说传入队列的对象必须实现Comparable接口),不遵循先进先出,在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,它是一个无界队列。
    不是加一个元素时就进行排序,而是调用take/poll方法时才进行将优先级最高的拿出来。

    package demo3;
    
    public class Task implements Comparable<Task> {
        private int     id;
        /**
         * @return the id
         */
        public int getId() {
            return id;
        }
    
        /**
         * @param id the id to set
         */
        public void setId(int id) {
            this.id = id;
        }
    
        /**
         * @return the name
         */
        public String getName() {
            return name;
        }
    
        /**
         * @param name the name to set
         */
        public void setName(String name) {
            this.name = name;
        }
    
        private String  name;
    
        @Override
        public int compareTo(Task task) {
            return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
        }
    
        /* (non-Javadoc)
         * @see java.lang.Object#toString()
         */
        @Override
        public String toString() {
            return "Task [id=" + id + ", name=" + name + "]";
        }
        
    }
    
    package demo3;
    
    import java.util.Iterator;
    import java.util.concurrent.PriorityBlockingQueue;
    
    public class UsePriorityBlockingQueue {
        public static void main(String[] args) throws InterruptedException {
            PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
            
            Task t1 = new Task();
            t1.setId(3);
            t1.setName("任务1");
            Task t2 = new Task();
            t2.setId(6);
            t2.setName("任务2");
            Task t3 = new Task();
            t3.setId(1);
            t3.setName("任务3");
            
            q.add(t1);
            q.add(t2);
            q.add(t3);
            System.out.println(q);
            for (Iterator iterator = q.iterator();iterator.hasNext();){
                Task task = (Task)iterator.next();
                System.out.println(task.getName());
            }
            
            System.out.println(q.poll());
            System.out.println(q.poll());
            System.out.println(q.poll());
        }
    }
    

    6、 DelayQueue

    带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delay接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等。
    未超时肯定会阻塞。

    四、场景

    应用非常繁忙,并发量非常大,应用承载量1000个任务,单核,一个线程,类似于马路上的早晚高峰,采用ArrayBlockingQueue有界队列,做容量内存限制,超过的话,给予相应的拒绝措施。
    平稳期,采用无界队列,车流量不是很大,可以放心使用LinkedBlockingQueue,能保证数据的实时性。
    夜半时分,不需要存储到队列中了(浪费空间),数据量非常少,采用虚拟的队列(无容量)SynchronousQueue队列,直接提交给线程,效率更高。

    相关文章

      网友评论

          本文标题:08.并发编程之Queue介绍

          本文链接:https://www.haomeiwen.com/subject/exbatxtx.html