美文网首页
8 线程安全策略

8 线程安全策略

作者: 十丈_红尘 | 来源:发表于2018-09-28 21:08 被阅读0次
    1️⃣不可变对象
    1. 不可变对象需要满足的条件:

    ① 对象创建后其状态就不能修改;
    ② 对象所有域都是final类型的;
    ③ 对象是正确创建的(在对象创建期间,this引用没有逸出);


    2. final关键字

    ① final可以用来修改类 方法 变量;
    ② final修饰的类不能被继承(我们平时使用的基础数据类型的类都是final的);
    ③ final修饰方法不能被继承类修改,并且在一定程度上会提升效率;
    ④ final修饰变量如果是基本数据类型则初始化后就不能修改变量了,如果是引用数据类型则在初始化后就不能更改其引用;


    3.代码演示
    @Slf4j
    @NotThreadSafe
    public class ImmutableExample1 {
    
        private final static Integer a = 1;
        private final static String b = "2";
        private final static Map<Integer, Integer> map = Maps.newHashMap();
    
        static {
            map.put(1, 2);
            map.put(3, 4);
            map.put(5, 6);
        }
    
        public static void main(String[] args) {
    //        a = 2;
    //        b = "3";
    //        map = Maps.newHashMap();
            map.put(1, 3);
            log.info("{}", map.get(1));
        }
    
        private void test(final int a) {
    //        a = 1;
        }
    }
    

    Java中出了final可以用来修饰不可变对象以外还有其他的方法也可以来修饰不可变对象比如:
    ①以Collections.unmodifiableXXX为前缀的方法,包括Collection List Set Map等...
    ②Guava : ImmutableXXX : Collection List Set Map等...


    4. Collections.unmodifiableXXX修饰不可变对象
    @Slf4j
    @ThreadSafe
    public class ImmutableExample2 {
    
        private static Map<Integer, Integer> map = Maps.newHashMap();
    
        static {
            map.put(1, 2);
            map.put(3, 4);
            map.put(5, 6);
            map = Collections.unmodifiableMap(map);
        }
    
        public static void main(String[] args) {
            map.put(1, 3);
            log.info("{}", map.get(1));
        }
    
    }
    

    从上图中可以看出经过Collections.unmodifiableMap修饰以后这个map就不可变了,如果强制put会抛出异常;接下来我们来看一下源码

    public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
            return new UnmodifiableMap<>(m);
        }
    

    在底层创建了一个UnmodifiableMap然后将原来map中的数据复制到这个新的map中以后,将map中的方法修改,修改成抛出异常如下所示

    @Override
    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
                throw new UnsupportedOperationException();
    }
    

    5. Guava : ImmutableXXX修饰不可变对象
    @ThreadSafe
    public class ImmutableExample3 {
    
        private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
    
        private final static ImmutableSet set = ImmutableSet.copyOf(list);
    
        private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1, 2, 3, 4);
    
        private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
                .put(1, 2).put(3, 4).put(5, 6).build();
    
    
        public static void main(String[] args) {
            System.out.println(map2.get(3));
        }
    }
    

    2️⃣线程封闭
    1 概念

    当访问共享的可变数据时,通常需要使用同步。一种避免使用同步的方式就是不共享数据。如果仅在单线程内访问数据,就不需要同步。这种技术被称为线程封闭。

    2 实现线程封闭的几种方法

    ① Ad-hoc线程封闭:程序控制实现,效果最差可以直接忽略;
    ②堆栈封闭:局部变量,无并发问题;
    ③ThreadLocal线程封闭:特别好的封闭方法;

    3 ThreadLocal线程封闭代码实现
    // 声明RequestHolder类
    public class RequestHolder {
    
        private final static ThreadLocal<Long> requestHolder = new ThreadLocal<>();
    
        public static void add(Long id) {
            requestHolder.set(id);
        }
    
        public static Long getId() {
            return requestHolder.get();
        }
    
        public static void remove() {
            requestHolder.remove();
        }
    }
    
    // 定义HttpFilter
    @Slf4j
    public class HttpFilter implements Filter {
    
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
    
        }
    
        @Override
        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
            HttpServletRequest request = (HttpServletRequest) servletRequest;
            log.info("do filter, {}, {}", Thread.currentThread().getId(), request.getServletPath());
            RequestHolder.add(Thread.currentThread().getId());
            filterChain.doFilter(servletRequest, servletResponse);
        }
    
        @Override
        public void destroy() {
    
        }
    }
    
    // 声明HttpInterceptor
    @Slf4j
    public class HttpInterceptor extends HandlerInterceptorAdapter {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            log.info("preHandle");
            return true;
        }
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            RequestHolder.remove();
            log.info("afterCompletion");
            return;
        }
    }
    
    // 配置启动类ConcurrencyApplication
    @SpringBootApplication
    @EnableHystrixDashboard
    @EnableCircuitBreaker
    public class ConcurrencyApplication extends WebMvcConfigurerAdapter{
    
        public static void main(String[] args) {
            SpringApplication.run(ConcurrencyApplication.class, args);
        }
    
        @Bean
        public FilterRegistrationBean httpFilter() {
            FilterRegistrationBean registrationBean = new FilterRegistrationBean();
            registrationBean.setFilter(new HttpFilter());
            registrationBean.addUrlPatterns("/threadLocal/*");
            return registrationBean;
        }
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(new HttpInterceptor()).addPathPatterns("/**");
        }
    }
    
    // 声明ThreadLocalController类
    @Controller
    @RequestMapping("/threadLocal")
    public class ThreadLocalController {
    
        @RequestMapping("/test")
        @ResponseBody
        public Long test() {
            return RequestHolder.getId();
        }
    }
    

    3️⃣线程不安全类与写法
    1 StringBuilder与StringBuffer
    // StringBuilder
    @Slf4j
    @NotThreadSafe
    public class StringExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static StringBuilder stringBuilder = new StringBuilder();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", stringBuilder.length());
        }
    
        private static void update() {
            stringBuilder.append("1");
        }
    }
    
    // StringBuffer
    @Slf4j
    @ThreadSafe
    public class StringExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static StringBuffer stringBuffer = new StringBuffer();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", stringBuffer.length());
        }
    
        private static void update() {
            stringBuffer.append("1");
        }
    }
    

    通过这两个例子我们可以看出在多线程情况下StringBuffer是线程安全的,而StringBuilder是线程不安全的,其实StringBuffer底层实现的时候是加了synchronized关键字的而StringBuilder则没有加这个关键字;可以要提供两个功能基本相同的类呢?因为StringBuffer虽然是线程安全的,但是效率较低而StringBuilder的效率是相较于StringBuffer略高的;

    2 SimpleDateFormat与JodaTime
    // SimpleDateFormat
    @Slf4j
    @NotThreadSafe
    public class DateFormatExample1 {
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void update() {
            try {
                simpleDateFormat.parse("20180208");
            } catch (Exception e) {
                log.error("parse exception", e);
            }
        }
    }
    

    可以看到在并发情况下,这个测试代码直接就报错了,原因是由于这个类不是线程安全的,在多线程环境执行的时候会直接抛出异常,如果大家一定要使用的话,可以参考下边的实现方式来实现;

    // SimpleDateFormat线程安全版本
    @Slf4j
    @ThreadSafe
    public class DateFormatExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void update() {
            try {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
                simpleDateFormat.parse("20180208");
            } catch (Exception e) {
                log.error("parse exception", e);
            }
        }
    }
    

    相信看到这里大家就已经明白了,在使用这个类的时候每次我们都生成一个新的变量来使用就可以解决线程不安全的问题;

    // JodaTime
    @Slf4j
    @ThreadSafe
    public class DateFormatExample3 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd");
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }
    
        private static void update(int i) {
            log.info("{}, {}", i, DateTime.parse("20180208", dateTimeFormatter).toDate());
        }
    }
    

    可以看到JodaTime在并发环境下即使不做任何的特殊处理也是没有问题的,所以这里建议大家在平时工作中使用JodaTime来处理时间相关的问题.

    3 ArrayList HashSet HashMap等
    // ArrayList
    @Slf4j
    @NotThreadSafe
    public class ArrayListExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new ArrayList<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    
    // HashSet
    @Slf4j
    @NotThreadSafe
    public class HashSetExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new HashSet<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }
    
    // HashMap
    @Slf4j
    @NotThreadSafe
    public class HashMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new HashMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    

    通过我们的演示,我们已经看到ArrayList HashSet HashMap不是线程安全的,但是我们平时使用的时候为什么没有察觉到呢?那是因为我们在使用的时候一般都是作为局部变量来使用的所以很少会出现问题;关于ArrayList HashSet HashMap这些类有没有线程安全的处理类呢,答案是肯定的,而且不止一种,后边我们会详细介绍;关于线程不安全的类我们就先介绍到这里;


    4️⃣同步容器

    在java中同步容器有两种,一种是像Vector Stack HashTable这样的提供好的类,另外一种是Collections.synchronizedXXX(List Set Map)提供的一些静态工厂方法创建的类,这些方法都是以synchronized修饰的方法;

    1 Vector : Vector实现了List接口,Vector实际上即使一个数组,Vector中的方法都是使用synchronized修饰的所以是线程安全的同步容器;
    // Vector1
    @Slf4j
    @ThreadSafe
    public class VectorExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new Vector<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    

    可以看到我们在使用Vector以后我们结果已经和预期是一致的了,但是Vector并不是线程安全的

    // Vector2
    @NotThreadSafe
    public class VectorExample2 {
    
        private static Vector<Integer> vector = new Vector<>();
    
        public static void main(String[] args) {
    
            while (true) {
    
                for (int i = 0; i < 10; i++) {
                    vector.add(i);
                }
    
                Thread thread1 = new Thread() {
                    public void run() {
                        for (int i = 0; i < vector.size(); i++) {
                            vector.remove(i);
                        }
                    }
                };
    
                Thread thread2 = new Thread() {
                    public void run() {
                        for (int i = 0; i < vector.size(); i++) {
                            vector.get(i);
                        }
                    }
                };
                thread1.start();
                thread2.start();
            }
        }
    }
    

    大家可以看到这里我们的同步容器抛出了异常,可是这是为什么呢?Vector里边的方法虽然是被synchronized修饰的同步方法,但是由于操作顺序的不同在并发的环境还是会出现问题的,就如以上代码所示那样;

    2 Stack : Stack是一个同步容器,它的方法也是使用synchronized来进行修饰的,而且Stack是继承与Vector的;关于Stack这里就不在举例说明了,因为它是继承自Vector的
    3 HashTable : HashTable实现了Map接口和HashMap非常相似, HashTable进行了同步处理相关的方法也是使用synchronized来进行修饰的;
    // HashTable
    @Slf4j
    @ThreadSafe
    public class HashTableExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new Hashtable<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    

    从这里我们可以看到我们使用HashTable的话这个实例就编程了线程安全的;

    4 Collections : Collections它是一个工具提供的类,与Collection是不一样的, Collections这个类中提供了大量的方法;
    // Collections.synchronizedList
    @Slf4j
    @ThreadSafe
    public class CollectionsExample1 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    
    // Collections.synchronizedSet
    @Slf4j
    @ThreadSafe
    public class CollectionsExample2 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = Collections.synchronizedSet(Sets.newHashSet());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }
    
    // Collections.synchronizedMap
    @Slf4j
    @ThreadSafe
    public class CollectionsExample3 {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>());
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    

    可以看到我们在使用Collections的同步容器以后,这些实例都编程了线程安全的;


    5️⃣并发容器与安全共享策略总结
    1 并发容器
    ArrayList --> CopyOnWriteArrayList

    概念 : 简单的讲就是写操作时赋值,当有新元素添加到CopyOnWriteArrayList时,它先从原有的数组里边Copy一份出来然后在新的数组上做些操作,操作完成以后在将引用指向新的数组;CopyOnWriteArrayList所有的操作都是在锁的保护下进行的,这样做的目的主要是为了在多线程并发做add操作的时候复制出多个副本出来导致数据混乱;
    缺点 :
    ① 由于是copy的操作所以比较消耗内存,如果元素的内容较多的时候可能会触发GC,
    ② 不能用于实时读的场景,它比较适合读多写少的场景;
    思想 :
    ① 读写分离;
    ② 最终一致性;
    ③ 另外开辟空间解决并发冲突;

    // CopyOnWriteArrayList
    @Slf4j
    @ThreadSafe
    public class CopyOnWriteArrayListExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static List<Integer> list = new CopyOnWriteArrayList<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", list.size());
        }
    
        private static void update(int i) {
            list.add(i);
        }
    }
    
    HashSet --> CopyOnWriteArraySet 与 TreeSet --> ConcurrentSkipListSet

    概念 :
    CopyOnWriteArraySet它是线程安全,底层实现是使用CopyOnWriteArrayList,它的很多特性都与CopyOnWriteArrayList相似包括适用场景;
    ConcurrentSkipListSet是jdk6新增的类,支持自然排序,可以在构造的时候自己定义比较器,它是基于Map集合的,在多线程环境下ConcurrentSkipListSet它里边的remote add 等方法都是线程安全的,但是对于批量操作并不能保证以原子方式进行操作,在批量操作的时候只能保证每一次的操作是原子性的;ConcurrentSkipListSet在使用批量操作的时候可能需要手动处理一下;

    // CopyOnWriteArraySet
    @Slf4j
    @ThreadSafe
    public class CopyOnWriteArraySetExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new CopyOnWriteArraySet<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }
    
    // ConcurrentSkipListSet
    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListSetExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Set<Integer> set = new ConcurrentSkipListSet<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", set.size());
        }
    
        private static void update(int i) {
            set.add(i);
        }
    }
    
    HashMap --> ConcurrentHashMap 与 TreeMap --> ConcurrentSkipListMap

    概念 :
    ConcurrentHashMap是HashMap线程安全的版本,ConcurrentHashMap不允许空值,在实际的应用中除了少数的插入操作和删除操作外,绝大多数操作都是读取操作,而且读操作大多数都是成功的,基于这个前提ConcurrentHashMap针对读操作多了特别多的优化,具有特别高的并发性;
    ConcurrentSkipListMap是TreeMap线程安全的版本,ConcurrentSkipListMap底层是使用SkipList这种跳表的结构实现的;

    // ConcurrentHashMap
    @Slf4j
    @ThreadSafe
    public class ConcurrentHashMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    
    // ConcurrentSkipListMap
    @Slf4j
    @ThreadSafe
    public class ConcurrentSkipListMapExample {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                final int count = i;
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        update(count);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("size:{}", map.size());
        }
    
        private static void update(int i) {
            map.put(i, i);
        }
    }
    
    2 J.U.C的实际构成
    image.png
    3 安全共享对象策略总结

    1 线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改;
    2 共享只读 : 一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它;
    3 线程安全对象 : 一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它;
    4 被守护对象 : 被守护对象只能通过获取特定的锁来访问;

    相关文章

      网友评论

          本文标题:8 线程安全策略

          本文链接:https://www.haomeiwen.com/subject/kqhmoftx.html