美文网首页
同步容器

同步容器

作者: 永远的太阳0123 | 来源:发表于2018-09-19 09:50 被阅读0次

    同步容器用于解决并发情况下的容器线程安全问题,可以给多线程环境准备一个线程安全的容器对象。
    线程安全的容器对象: Vector, Hashtable。它们是使用synchronized方法实现的。
    concurrent包中的同步容器,大多数是使用系统底层技术实现的线程安全。Java8中使用的是CAS。

    1.1 ConcurrentHashMap/ConcurrentHashSet
    底层哈希实现的同步Map(Set)。效率高,线程安全。使用系统底层技术实现线程安全。
    量级较synchronized低。key和value不能为null。

    public class Test1_ConcurrentMap {
    
        public static void main(String[] args) {
            final Map<String, String> map = new Hashtable<>();// 效率低
            // final Map<String, String> map = new ConcurrentHashMap<>();效率高于Hashtable
            // final Map<String, String> map = new ConcurrentSkipListMap<>();元素有序,效率最低
            final Random r = new Random();
            Thread[] array = new Thread[100];
            final CountDownLatch latch = new CountDownLatch(array.length);// 门闩
    
            long begin = System.currentTimeMillis();
            
            for (int i = 0; i < array.length; i++) {
                array[i] = new Thread(new Runnable() {
                    public void run() {
                        for (int j = 0; j < 10000; j++) {
                            map.put("key" + r.nextInt(100000), "value" + r.nextInt(100000));
                        }
                        latch.countDown();
                    }
                });
            }
            for (Thread t : array) {
                t.start();
            }
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long end = System.currentTimeMillis();
            System.out.println("执行时间为 : " + (end - begin) + "毫秒!");
        }
    
    }
    

    1.2 ConcurrentSkipListMap/ConcurrentSkipListSet
    底层跳表(SkipList)实现的同步Map(Set)。有序,效率比ConcurrentHashMap低。


    10、15、18、19、20表示key,(1)、(2)、(3)、(4)、(5)表示插入顺序

    2 CopyOnWriteArrayList
    写入数据时会复制集合。
    写入效率低,读取效率高。
    每次写入数据,都会创建一个新的底层数组,很浪费空间,用于写入少读取多的情况。

    public class Test2_CopyOnWriteList {
    
        public static void main(String[] args) {
            // final List<String> list = new ArrayList<>();线程不安全
            // final List<String> list = new Vector<>();
            final List<String> list = new CopyOnWriteArrayList<>();
            final Random r = new Random();
            Thread[] array = new Thread[100];
            final CountDownLatch latch = new CountDownLatch(array.length);
    
            long begin = System.currentTimeMillis();
            
            for (int i = 0; i < array.length; i++) {
                array[i] = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < 1000; j++) {
                            list.add("value" + r.nextInt(100000));
                        }
                        latch.countDown();
                    }
                });
            }
            for (Thread t : array) {
                t.start();
            }
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long end = System.currentTimeMillis();
            System.out.println("执行时间为 : " + (end - begin) + "毫秒!");
            System.out.println("List.size() : " + list.size());
        }
    
    }
    

    3.1 ConcurrentLinkedQueue
    链表实现的同步队列。

    public class Test3_ConcurrentLinkedQueue {
    
       public static void main(String[] args) {
           Queue<String> queue = new ConcurrentLinkedQueue<>();
           for (int i = 0; i < 10; i++) {
               queue.offer("value" + i);// 添加数据
           }
    
           System.out.println(queue);
           System.out.println(queue.size());
    
           // peek() -> 查看queue中的首数据
           System.out.println(queue.peek());
           System.out.println(queue.size());
    
           // poll() -> 删除和获取queue中的首数据
           System.out.println(queue.poll());
           System.out.println(queue.size());
       }
    
    }
    

    3.2 LinkedBlockingQueue
    阻塞队列
    执行put方法时, 队列容量不足,自动阻塞。
    执行take方法, 队列容量为0,自动阻塞。

    示例:

    public class Test4_LinkedBlockingQueue {
    
        final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        final Random r = new Random();
    
        public static void main(String[] args) {
            final Test4_LinkedBlockingQueue t = new Test4_LinkedBlockingQueue();
            
            // 一个生产者
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            t.queue.put("value" + t.r.nextInt(1000));
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "producer").start();
            
            // 三个消费者
            for (int i = 0; i < 3; i++) {
                new Thread(new Runnable() {
                    public void run() {
                        while (true) {
                            try {
                                System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }, "consumer" + i).start();
            }
        }
    
    }
    

    3.3 ArrayBlockingQueue
    数组实现的有界队列。
    执行add方法时,数组容量不足,抛出异常。
    执行put方法时,数组容量不足,阻塞等待。
    执行offer方法时,
    (1)单参数offer方法,不阻塞。数组容量不足时,返回false,放弃要新增的数据。
    (2)三参数offer方法(offer(value,times,timeunit)),数组容量不足时,阻塞times时长(单位为timeunit)。如果在阻塞时长内,数组容量出现空闲,新增数据返回true。如果阻塞时长范围内,无容量空闲,放弃要新增的数据,返回false。
    它的其它方法和LinkedBlockingQueue类似。

    示例:

    public class Test5_ArrayBlockingQueue {
    
        final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
    
        public static void main(String[] args) {
            final Test5_ArrayBlockingQueue t = new Test5_ArrayBlockingQueue();
    
            for (int i = 0; i < 5; i++) {
                // System.out.println("add method : " + t.queue.add("value" + i));执行add方法时,数组容量不足,抛出异常
    
                /*try {
                    t.queue.put("put" + i);执行put方法时,数组容量不足,阻塞等待。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("put method : " + i);*/
                
                //System.out.println("offer method : " + t.queue.offer("value"+i));单参数offer方法,不阻塞。数组容量不足时,返回false,放弃要新增的数据。
                
                try {
                    // 三参数offer方法,数组容量不足时,阻塞times时长(单位为timeunit)。
                    // 如果在阻塞时长内,数组容量出现空闲,新增数据返回true。如果阻塞时长范围内,无容量空闲,放弃要新增的数据,返回false。
                    System.out.println("offer method : " + t.queue.offer("value" + i, 1, TimeUnit.SECONDS));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            System.out.println(t.queue);
        }
    
    }
    

    3.4 DelayQueue
    延时队列,无界。
    根据比较机制,实现自定义处理顺序的队列。常用于定时任务,例如:定时关机。
    队列中存放的元素必须实现Delay接口。

    示例:

    public class Test6_DelayQueue {
    
        static BlockingQueue<MyTask_06> queue = new DelayQueue<>();
    
        public static void main(String[] args) throws InterruptedException {
            long value = System.currentTimeMillis();
    
            MyTask_06 task1 = new MyTask_06(value + 2000);
            MyTask_06 task2 = new MyTask_06(value + 1000);
            MyTask_06 task3 = new MyTask_06(value + 3000);
            MyTask_06 task4 = new MyTask_06(value + 2500);
            MyTask_06 task5 = new MyTask_06(value + 1500);
            queue.put(task1);
            queue.put(task2);
            queue.put(task3);
            queue.put(task4);
            queue.put(task5);
    
            System.out.println(queue);
            System.out.println("value" + value);
            for (int i = 0; i < 5; i++) {
                System.out.println(queue.take());
            }
        }
    
    }
    
    class MyTask_06 implements Delayed {
    
        private long compareValue;
    
        public MyTask_06(long compareValue) {
            this.compareValue = compareValue;
        }
    
            // 比较大小,实现自动升序,建议和getDelay方法配合使用
            // 如果在DelayQueue中需要按时间完成计划任务,必须配合getDelay方法完成。
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    
         // 获取计划时长的方法。 根据参数TimeUnit来决定,如何返回结果值。
        public long getDelay(TimeUnit unit) {
            return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        public String toString() {
            return "Task compare value is : " + this.compareValue;
        }
    
    }
    

    3.5 LinkedTransferQueue
    转移队列
    执行add方法时,队列保存数据,不会阻塞等待。
    执行transfer方法(TransferQueue接口的特有方法)时,没有消费者阻塞(take方法的调用者),发生阻塞;有消费者阻塞,数据直接给到消费者。
    一般用于处理即时消息。

    示例:

    public class Test7_LinkedTransferQueue {
        
        TransferQueue<String> queue = new LinkedTransferQueue<>();
        
        public static void main(String[] args) {
            final Test7_LinkedTransferQueue t = new Test7_LinkedTransferQueue();
            
            // 消费者阻塞,直接拿走transfer方法的数据
            /*new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " thread begin " );
                        System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "output thread").start();
            
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            try {
                t.queue.transfer("test string");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            
            
            // transfer方法阻塞,消费者消费时直接拿走数据
            new Thread(new Runnable() {
                public void run() {
                    try {
                        t.queue.transfer("test string");// 阻塞,直至消费者来拿数据
                        // t.queue.add("test string");// 不阻塞,向TransferQueue中添加数据
                        System.out.println("ok");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " thread begin " );
                        System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "output thread").start();
            
        }
    
    }
    

    3.6 SynchronusQueue
    同步队列,是一个容量为0的队列,是一个特殊的TransferQueue。
    必须先有消费线程阻塞,才能使用的队列。
    add方法,无阻塞能力。若没有消费线程阻塞等待数据,则抛出异常。
    put方法,有阻塞能力。若没有消费线程阻塞等待数据,则阻塞。

    示例:

    public class Test8_SynchronusQueue {
    
        BlockingQueue<String> queue = new SynchronousQueue<>();
    
        public static void main(String[] args) {
            final Test8_SynchronusQueue t = new Test8_SynchronusQueue();
    
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " thread begin ");
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "output thread").start();
    
            // try {
            // TimeUnit.SECONDS.sleep(3);
            // } catch (InterruptedException e) {
            // e.printStackTrace();
            // }
            // t.queue.add("test add");
    
            try {
                t.queue.put("test put");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size());// 输出main queue size : 0
        }
    
    }
    

    相关文章

      网友评论

          本文标题:同步容器

          本文链接:https://www.haomeiwen.com/subject/sjwmnftx.html