美文网首页
并发容器 J.U.C

并发容器 J.U.C

作者: 磊_5d71 | 来源:发表于2018-11-03 15:15 被阅读0次
    图片.png

    CopyOnWriteArrayList(对应ArrayList)

    • CopyOnWriteArrayList使用了一种叫写时复制的方法,当有新元素添加到CopyOnWriteArrayList时,先从原有的数组中拷贝一份出来,然后在新的数组做写操作,写完之后,再将原来的数组引用指向到新数组。
    • CopyOnWriteArrayList的整个add操作都是在锁的保护下进行的。
      这样做是为了避免在多线程并发add的时候,复制出多个副本出来,把数据搞乱了,导致最终的数组数据不是我们期望的。
    • 通过上面的分析,CopyOnWriteArrayList 有几个缺点:
      1、由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc
      2、不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求.
    package com.alan.concurrency.example.concurrent;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    
    @Slf4j
    public class CopyOnWriteArrayListExample {
    
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static List<Integer> list = new CopyOnWriteArrayList<>();
    
    
        private static void update(){
            list.add(1);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            update();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}",list.size());
        }
    }
    

    CopyOnWriteArraySet(对应HashSet) ConcurrentSkipListSet(对应TreeSet)

    package com.alan.concurrency.example.concurrent;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Set;
    import java.util.concurrent.*;
    
    
    @Slf4j
    public class CopyOnWriteHashSetExample {
    
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static Set<Integer> set = new CopyOnWriteArraySet<>();
    
    
        private static void update(int i){
            set.add(i);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                  int count = i;
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            update(count);
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    }
    

    ConcurrentHashMap(对应HashMap) ConcurrentSkipListMap(对应TreeMap)

    • ConcurrentHashMap
      1、Java5中新增加的一个线程安全的Map集合,可以用来替代HashTable。对于ConcurrentHashMap是如何提高其效率的,
      2、它使用了多个锁代替HashTable中的单个锁,也就是锁分离技术(Lock Stripping)
    package com.alan.concurrency.example.concurrent;
    
    import com.alan.concurrency.annoations.NotThreadSafe;
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.*;
    
    @Slf4j
    @ThreadSafe
    public class ConcurrentHashMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    
    • ConcurrentSkipListMap
      1、TreeMap使用红黑树按照key的顺序(自然顺序、自定义顺序)来使得键值对有序存储,但是只能在单线程下安全使用;多线程下想要使键值对按照key的顺序来存储,则需要使用ConcurrentSkipListMap。
      2、ConcurrentSkipListMap的底层是通过跳表来实现的。跳表是一个链表,但是通过使用“跳跃式”查找的方式使得插入、读取数据时复杂度变成了O(logn)
    package com.alan.concurrency.example.concurrent;
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    
    图片.png 图片.png 图片.png

    相关文章

      网友评论

          本文标题:并发容器 J.U.C

          本文链接:https://www.haomeiwen.com/subject/wbjkxqtx.html