美文网首页
一个关于同步接口并发控制的简单demo

一个关于同步接口并发控制的简单demo

作者: 名字是乱打的 | 来源:发表于2022-10-14 14:22 被阅读0次

    背景:
    1.有个批量关单关单并且同步返回结果的需求

    1.分析

    需求

    • 1.批量关单,且需要同步返回结果
      由于调用我方接口是一次传入很多任务,假设是N,串行关单必然超时,因此这里需要并发执行
    • 2.由于公司不允许随便定义线程池(即使比较小的池)
      因此不能用线程池做并发控制,需要自定义线程控制
    • 3.由于下游rpc给到我们这边的流量上限是有限的,假设是X,因此我们需要并发,但是又需要控制同时并发的数量
    2.写代码

    方案:线程池(实现并发)+CountDownLatch(实现阻塞主线程,并发结束返回结果)+Semaphore (实现并发控制)

    需要注意的一点是
    等待队列容量>=2*Semaphore,不然会有线程因为拿不到线程池资源不处理直接失败的(原因参考线程池执行流程),当然我们可以将这部分压根没处理的也一并同步返回出去,让调用侧重试(我方不重试,防止超时问题)

    public class test {
    
        /**
         * 线程池
         * 等待队列容量>=2*Semaphore
         */
        static ExecutorService threadPool = new ThreadPoolExecutor(
                5,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory()
                , new ThreadPoolExecutor.AbortPolicy());
    
    
        public static void main(String[] args) throws InterruptedException {
    
            List<Integer> successList=new CopyOnWriteArrayList<>();
            List<Integer> failList=new CopyOnWriteArrayList<>();
            List<Integer> errorList=new CopyOnWriteArrayList<>();
            
            CountDownLatch countDownLatch = new CountDownLatch(100);
            Semaphore semaphore=new Semaphore(10);
            for (int i = 0; i < 100; i++) {
    
                try {
                    AtomicInteger self = new AtomicInteger(i);
                    threadPool.execute(() -> {
                        try {
                            if ((self.get() & 1) == 1) {
                                int temp = (self.get()/0);
                            }
                            Thread.sleep(5000);
                            successList.add(self.get());
                            System.out.println(self.get() + "执行成功");
                        }catch (Exception e){
                            System.out.println(self.get() + "执行失败");
                            failList.add(self.get());
                        }finally {
                            semaphore.release();
                            countDownLatch.countDown();
                        }
                    });
                } catch (Exception e) {
                    errorList.add(i);
                    System.out.println("biz  Exception");
                    semaphore.release();
                    countDownLatch.countDown();
                }
            }
            countDownLatch.await();
            threadPool.shutdown();
            System.out.println("成功的:"+successList.size());
            successList.stream().sorted().forEach(item->{
                System.out.print(item+" ");
            });
    
            System.out.println();
            
            System.out.println("失败的:"+failList.size());
            failList.stream().sorted().forEach(item->{
                System.out.print(item+" ");
            });
            System.out.println();
    
            System.out.println("错误的:"+errorList.size());
            errorList.stream().sorted().forEach(item->{
                System.out.print(item+" ");
            });
            
            System.out.println("全部执行结束");
            threadPool.shutdown();
            
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:一个关于同步接口并发控制的简单demo

          本文链接:https://www.haomeiwen.com/subject/hzqczrtx.html