美文网首页
Java中如何限制方法访问的并发数

Java中如何限制方法访问的并发数

作者: yizhenqiang | 来源:发表于2018-08-04 16:54 被阅读0次

    并发编程一直是Java基础中的高地,但很多只要有两三年Java基础的工程师,会在简历中很明确的标明“熟悉多线程编程、有高并发编程经验”,来凸显自己编程能力有多厉害,但实际上可能只是看了《Java Concurrency in Practice》的几个章节而已。其实对很多业务研发工程师来说,高并发编程经验并不是必备的核心竞争力之一,很多需要加锁或者统计的场景,大都可以交给外部系统如Redis来做,即多线程并发场景的转移。
    那么作为面试官,如何简单快速考察面试者的多线程编程能力?可能方法有很多,笔者喜欢用到的一个题目如下:“对于单个Java应用,我们如何限制其中某个方法methodA()被调用的并发数不能超过100,如果超过100,超出的请求就直接返回null或抛异常”。笔者会要求面试者在白板上写出相应代码片段(当然并不会要求面试者一定要完整写出用到的类名或方法名)。
    这个题看起来并不难,但仔细想想也不那么简单,大约30%的面试者会给出Semaphore的解决方案,例如:

        private static Semaphore semaphore = new Semaphore(100);
    
        public static Integer methodA() {
            if(!semaphore.tryAcquire()) {
                return null;
            }
    
            try {
            // TODO 方法中的业务逻辑
            } finally {
                semaphore.release();
            }
        }
    

    Semaphore信号量是一种比较完美的解决方案,代码简单而高效,一旦面试者给出这个方案,我们可以顺便考察下信号量相关的知识点。

    但还有没有其他的思路或解决方案呢?笔者接触到的面试者,还给出过线程池的方案,因为最高并发数是100,那么表明同时最多只能有100个线程访问该方法,面试者一般会这么写:

        private final static ExecutorService pool = new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, new SynchronousQueue<>());
    
        public static Integer methodAWrapper() {
            try {
                Future<Integer> future = pool.submit(() -> methodA());
                return future.get();
            } catch (Exception e) {
              return null;
            }
        }
    
        public static Integer methodA() {
            // TODO 方法中的业务逻辑
        }
    

    其实严格意义来讲,线程池的这种方案无法完美做到“如果超过100,超出的请求就直接返回null或抛异常”,哪怕是使用SynchronousQueue队列。但没关系,最关键的考察点并不在超限如何返回。当面试者写出这种方案,也可以顺便考察下线程池相关的知识点。
    大多数面试者想到的是写个计数器,例如:

        private static AtomicInteger counter = new AtomicInteger(0);
    
        public static Integer methodA() {
    
            int value = counter.incrementAndGet();
            if(value > 100) {
                return null;
            }
    
            try {
                // TODO 方法中的业务逻辑
            } finally {
                counter.decrementAndGet();
            }
        }
    

    于是我一般会问,为啥是选择incrementAndGet方法,而不是选择getAndIncrement?仔细看看会不会有其他问题?大多数面试者经过提示都能发现这里get、incr和比较不是原子操作,会产生“竞态条件”(race condition)。我们先把这种计数器方案叫做方案A,像想到计数器方案的面试者,也有小部分会这样写:

        private static AtomicInteger counter = new AtomicInteger(0);
    
        public static Integer methodA() {
    
            int value = counter.get();
            if(value > 100) {
                return null;
            }
            
            counter.incrementAndGet();
    
            try {
                // TODO 方法中的业务逻辑
            } finally {
                counter.decrementAndGet();
            }
        }
    

    我们把这种计数器的实现方案叫做方案B,这两种方案都会有“竞态条件”的问题,但产生的现象不一样。

    对于方案A,在极端高并发的情况下,每个调用methodA的请求,都会对计数器进行+1,即使我们在finally对计数器进行了-1,也阻止value的值继续上涨,导致远大于100,得到的结果是所有请求没机会执行业务逻辑,即“饿死”现象。

    对于方案B,由于是活的执行业务逻辑的许可后再进行的+1操作,很显然在高并发情况下会导致执行业务逻辑的线程数超过100。

    很多Java老司机觉得自己不会犯这种错误,但实际上,最近阿里的开源项目Sentinel就有类似方案A的问题,Sentinel中使用责任链的模式,来对每笔调用进行统计、拦截,这里给出构建责任链DefaultSlotsChainBuilder类的代码片段:

    public class DefaultSlotsChainBuilder implements SlotsChainBuilder {
    
        @Override
        public ProcessorSlotChain build() {
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
            chain.addLast(new NodeSelectorSlot());
            chain.addLast(new ClusterBuilderSlot());
            chain.addLast(new LogSlot());
            chain.addLast(new StatisticSlot()); // 统计数据,类似于上文提到的incrementAndGet
            chain.addLast(new SystemSlot());    // 检查是否超过阈值,类似于上文提到的value > 100
            chain.addLast(new AuthoritySlot());
            chain.addLast(new FlowSlot());
            chain.addLast(new DegradeSlot());
    
            return chain;
        }
    }
    

    而在分布式服务框架Dubbo的早期版本(例如2.5.3),在对Provider提供线程限制保护的executes
    (例如:<dubbo:service interface="com.manzhizhen.dubbo.server.service.Dubbo2Service"
    ref="dubbo2Service" version="1.0.0" timeout="3000" executes="10" />)的实现方案,就踩了上述方案B的坑。

    回归正题,也有面试者给出了阻塞队列的方案,即:

        private static BlockingQueue<Integer> reqQueue = new ArrayBlockingQueue<>(100);
    
        public static Integer methodA() {
    
            if(!reqQueue.offer()) {
                return null;
            }
    
            try {
                // TODO 方法中的业务逻辑
            } finally {
                reqQueue.poll();
            }
        }
    

    阻塞队列的方案也是不错的,代码简单,效率也还行。

    当然,也有面试者说用Hystrix,嗯嗯,这也是可以的。

    做个总结吧,最优方案当然是使用Semaphore,但Semaphore无法统计实际高峰期时的并发量有多少(很多场景需要通过实际最高的并发值来优化我们系统),文章很短,希望对大家能有所帮助。

    相关文章

      网友评论

          本文标题:Java中如何限制方法访问的并发数

          本文链接:https://www.haomeiwen.com/subject/mpkmvftx.html