美文网首页
自定义线程池+多线程处理+CountDownLatch

自定义线程池+多线程处理+CountDownLatch

作者: Rain_z | 来源:发表于2020-09-30 01:58 被阅读0次

    前几天在写同步接口,因为数据量比较大,所以使用多线程,这里写了Demo记录下。

    自定义线程池

    1. Java中Executors已经提供了创建线程池的方式,但在阿里巴巴开发手册上是严禁使用的,建议使用自定义线程池,究其原因,是可能会产生一些问题。
    public static ExecutorService newFixedThreadPool(int nThreads) {
           return new ThreadPoolExecutor(nThreads, nThreads,
                                         0L, TimeUnit.MILLISECONDS,
                                         new LinkedBlockingQueue<Runnable>());
       }
    
    
    public static ExecutorService newSingleThreadExecutor() {
           return new FinalizableDelegatedExecutorService
               (new ThreadPoolExecutor(1, 1,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue<Runnable>()));
       }
    
    
    public static ExecutorService newCachedThreadPool() {
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                         60L, TimeUnit.SECONDS,
                                         new SynchronousQueue<Runnable>());
       }
    
    1. 查看源码发现newFixedThreadPool和newSingleThreadExecutor方法他们都使用了LinkedBlockingQueue的任务队列,LikedBlockingQueue的默认大小为Integer.MAX_VALUE。newCachedThreadPool中定义的线程池大小为Integer.MAX_VALUE。

    2. 通过源码发现禁止使用Executors创建线程池的原因就是newFixedThreadPool和newSingleThreadExecutor的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

    3. newCachedThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    下面是写了一个Demo演示

    public class ThreadPool {
    
    
        /**
         * 自定义线程名称,方便的出错的时候溯源
         */
        private static final ThreadFactory NAME_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();
        //private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().build();
    
        /**
         * corePoolSize    线程池核心池的大小
         * maximumPoolSize 线程池中允许的最大线程数量
         * keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
         * unit            keepAliveTime 的时间单位
         * workQueue       用来储存等待执行任务的队列
         * threadFactory   创建线程的工厂类
         * handler         拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
         */
        private static final ExecutorService service = new ThreadPoolExecutor(
                4,
                6,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                NAME_THREAD_FACTORY,
                new ThreadPoolExecutor.AbortPolicy()
        );
    
        /**
         * 获取线程池
         * @return 线程池
         */
        public static ExecutorService getEs() {
            return service;
        }
    
        /**
         * 使用线程池创建线程并异步执行任务
         * @param r 任务
         */
        public static void newTask(Runnable r) {
            service.execute(r);
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            String[] arr = {"a","b","c","d","e","f","g","h","1","4","n"};
            CountDownLatch countDownLatch = new CountDownLatch(arr.length);
            List<String> list = Arrays.asList(arr);
            ExecutorService es = getEs();
    
            System.out.println("开始处理...");
            int size = 0;
            List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
            try {
    
                for (String m : list) {
                    SendEmail sendEmail = new SendEmail(m, countDownLatch);
                    SendEmailCallBack sendEmailCallBack = new SendEmailCallBack(m, countDownLatch);
                    Future<Integer> future = es.submit(sendEmailCallBack);
                    resultList.add(future);
                }
                System.out.println("主线程等待...");
                countDownLatch.await();
                for (Future<Integer> future : resultList) {
                    size += future.get();
                }
                System.out.println("处理数量: "+size);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                es.shutdown();
            }
    
            System.out.println("处理完成...");
        }
    
    }
    

    业务类

    主要实现自己的业务逻辑

    public class SendEmailCallBack implements Callable<Integer> {
        private final String mobile;
        private final CountDownLatch countDownLatch;
    
        public SendEmailCallBack(String mobile, CountDownLatch countDownLatch) {
            this.mobile = mobile;
            this.countDownLatch = countDownLatch;
        }
    
        private int sendEmail() {
            try {
                Thread.sleep(1000);
                System.out.println("线程:"+Thread.currentThread().getName()+ ", 邮件" + mobile +"发送成功");
            }catch (Exception e){
                e.printStackTrace();
            }
            return 1;
        }
    
        @Override
        public Integer call() throws Exception {
            int num = 0;
            try {
                synchronized (this){
                     num = this.sendEmail();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                countDownLatch.countDown();
            }
            return num;
        }
    }
    

    CountDownLatch

    使用CountDownLatch是为了同步时,主线程等待所有线程执行执行完毕后,获取返回数据,汇总同步总数。

    控制台打印结果

    开始处理...
    主线程等待...
    线程:order-pool-0, 邮件a发送成功
    线程:order-pool-2, 邮件c发送成功
    线程:order-pool-3, 邮件d发送成功
    线程:order-pool-1, 邮件b发送成功
    线程:order-pool-3, 邮件g发送成功
    线程:order-pool-2, 邮件f发送成功
    线程:order-pool-0, 邮件e发送成功
    线程:order-pool-1, 邮件h发送成功
    线程:order-pool-3, 邮件1发送成功
    线程:order-pool-2, 邮件4发送成功
    线程:order-pool-0, 邮件n发送成功
    处理数量: 11
    处理完成...
    
    下面单独写了一个线程池创建类,修改下就可以使用,和上面Demo实现无关。
    public class ThreadPoolFactory {
        /**
        * 下面的属性大小可以改成获取服务器配置来动态调整
        */
        /**
         *核心线程数
         */
        private static final int corePoolSize = 4;
        /**
         * 最大核心线程数
         */
        private static final int maximumPoolSize = 6;
        /**
         * 工作队列
         */
        private static final int workQueue = 1024;
        /**
         *线程空闲时间
         */
        private static final int keepAliveTime = 30;
    
        /**
         * 自定义线程名称
         */
        private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();
    
        /**
         * corePoolSize    线程池核心池的大小
         * maximumPoolSize 线程池中允许的最大线程数量
         * keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
         * unit            keepAliveTime 的时间单位
         * workQueue       用来储存等待执行任务的队列
         * threadFactory   创建线程的工厂类
         * handler         拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
         */
        private static final ExecutorService service = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(workQueue),
                NAMED_THREAD_FACTORY,
                new ThreadPoolExecutor.AbortPolicy()
        );
    
        /**
         * 获取线程池
         * @return 线程池
         */
        public static ExecutorService getEs() {
            return service;
        }
    
        /**
         * 使用线程池创建线程并异步执行任务
         * @param runnable 任务
         */
        public static void newTask(Runnable runnable) {
            service.execute(runnable);
        }
    
    }
    

    相关文章

      网友评论

          本文标题:自定义线程池+多线程处理+CountDownLatch

          本文链接:https://www.haomeiwen.com/subject/proyuktx.html