美文网首页
Java业务常见开发错误-线程安全工具

Java业务常见开发错误-线程安全工具

作者: DZQANN | 来源:发表于2022-03-21 20:00 被阅读0次

    ThreadLocal没有清除

    如果前面一个线程没有清除的话,后面一个线程会直接拿到上面一个线程的结果,污染当前线程。当然前提是使用了线程池。

    @RestController
    @RequestMapping("/concurrent/threadlocal")
    public class ThreadLocalConcurrentTestController {
        private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
    
        @GetMapping("wrong")
        public Map<String, String> wrong(@RequestParam("userId") Integer userId) {
            String before = Thread.currentThread().getName() + ":" + currentUser.get();
            currentUser.set(userId);
            String after = Thread.currentThread().getName() + ":" + currentUser.get();
            Map<String, String> result = new HashMap<>();
            result.put("before", before);
            result.put("after", after);
            return result;
        }
    
        @GetMapping("right")
        public Map<String, String> right(@RequestParam("userId") Integer userId) {
            String before = Thread.currentThread().getName() + ":" + currentUser.get();
            currentUser.set(userId);
            try {
                String after = Thread.currentThread().getName() + ":" + currentUser.get();
                Map<String, String> result = new HashMap<>();
                result.put("before", before);
                result.put("after", after);
                return result;
            } finally {
                currentUser.remove();
            }
        }
    }
    

    初始化tomcat的连接是10,配置了最大连接数是1好像没有用,所以请求了11次,第11次的结果结果:

    5.png

    这个问题其实是有两点保护措施的

    • 在一个线程任务执行结束后在finally中手动将ThreadLocal变量清除
    • 每一个线程任务开始前不相信当前的ThreadLocal变量是干净的,也删除一次
      我们的系统中封装了一些ThreadLocal的变量,而且统一在Transaction结束的时候清空,我们自己其实并不需要再手写ThreadLocal了。而且对于分库这种系统最重要的业务逻辑,也会在使用之前就先清空一次ThreadLocal变量

    ConcurrentHashMap

    ConcurrentHashMap只做到了对于Map的读写是线程安全的,对于很多聚合方法其实是线程不安全的。

    下面一段示例是,先向一个ConcurrentHashMap中插入900条数据,然后起10个线程,每一个线程执行的操作是获取当前状态下map的size和1000的差值,然后向map里添加对应数量的内容。

    public class ConcurrentHashMapTestController {
        private static int THREAD_COUNT = 10;
        private static int ITEM_COUNT = 1000;
    
        private ConcurrentHashMap<String, Long> getData(int count) {
            return LongStream.rangeClosed(1, count)
                    .boxed()
                    .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
                            (o1, o2) -> o1, ConcurrentHashMap::new));
        }
    
        @GetMapping("wrong")
        public String wrong() throws InterruptedException {
            ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
            log.info("init size:{}", concurrentHashMap.size());
            ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
            forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
                int gap = ITEM_COUNT - concurrentHashMap.size();
                log.info("gap size:{}", gap);
                concurrentHashMap.putAll(getData(gap));
            }));
            forkJoinPool.shutdown();
            forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
            log.info("finish size:{}", concurrentHashMap.size());
            return "OK";
        }
    
        @GetMapping("right")
        public String right() throws InterruptedException {
            ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
            log.info("init size:{}", concurrentHashMap.size());
    
            ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
            forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
                synchronized (concurrentHashMap) {
                    int gap = ITEM_COUNT - concurrentHashMap.size();
                    log.info("gap size:{}", gap);
                    concurrentHashMap.putAll(getData(gap));
                }
            }));
            forkJoinPool.shutdown();
            forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
            log.info("finish size:{}", concurrentHashMap.size());
            return "OK";
        }
    }
    

    如果就是简单的多线程跑,看到结果其实最后添加到了1100+,正常情况应该是只有一个worker看到还剩100个,其它worker都应该是0,这说明其实size并不是一个线程安全方法


    2.png

    在锁住了map之后结果是正常的:


    3.png

    充分发挥并发工具类的特性

    假设有这么一个场景,创建10个线程,每个线程循环1000万次,每次生成一个随机key。最后统计所有线程生成的随机key中,每一个key的出现次数。

    这个地方肯定不能这么写:

    public void count() {
        if (map.containsKey(key)) {
        map.put(key, map.get(key) + 1);
      } else {
        map.put(key, 1);
      }
    }
    

    就算用的是concurrentHashMap,也把应该的一个原子操作拆成了两步,这就和之前我们的redis锁失效一个道理。

    想让它不出错,就在这个逻辑的外面锁住map对象,但是锁住map对象会导致很严重的性能问题。

    这时候可以这么做:

    public void count() {
        map.computeIfAbsent(key, new LongAdder()).increment();
    }
    

    都是原子性的操作,充分应用了多线程的性能,又保证了安全

    认清并发工具的使用场

    这里举的例子是CopyOnWriteArrayList,这个东西是一个线程安全的ArrayList,但是每次做add操作都会复制当前的List。所以当写操作非常多的时候,使用这个List就会耗费很多的资源

    一共四个例子,前面两个例子对于我们的系统还是有一些参考的,后面就是简单看看就可以,暂时用不上。值得思考的是我们在使用一个新的库的时候,容易出现使用不当的问题。比如我要用ConrrentHashMap,我肯定不会看官方文档,并且认为里面的所有方法都是线程安全的,所以当我调用size的时候就有可能出问题。这种也不可能要求使用的时候都去看明白官方文档,只能说用的时候注意一下

    相关文章

      网友评论

          本文标题:Java业务常见开发错误-线程安全工具

          本文链接:https://www.haomeiwen.com/subject/lrfqjrtx.html