美文网首页Java 杂谈多线程Java
线程池正确使用姿势

线程池正确使用姿势

作者: LandHu | 来源:发表于2019-05-11 13:04 被阅读1次

    为什么不推荐使用Executors

    底层确实是通过LinkedBlockingQueue实现的,默认不设置队列大小的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM,报错如下:

    Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
       at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
       at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
       at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
    

    创建线程池的正确姿势

    public class ExecutorsDemo {
       private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
           .setNameFormat("demo-pool-%d").build();
    
       private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
           0L, TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    
       public static void main(String[] args) {
           for (int i = 0; i < Integer.MAX_VALUE; i++) {
               pool.execute(new SubThread());
           }
       }
    }
    

    关闭线程池

    long start = System.currentTimeMillis();
    for (int i = 0; i <= 5; i++) {
        pool.execute(new Job());
    }
        pool.shutdown();
        while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
               LOGGER.info("线程还在执行。。。");
    }
        long end = System.currentTimeMillis();
        LOGGER.info("一共处理了【{}】", (end - start));
    

    SpringBoot 使用线程池

    //线程池配置
    @Configuration
    public class TreadPoolConfig {
        /**
         * 消费队列线程
         * @return
         */
        @Bean(value = "consumerQueueThreadPool")
        public ExecutorService buildConsumerQueueThreadPool(){
            ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                    .setNameFormat("consumer-queue-thread-%d").build();
    
            ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());
    
            return pool ;
        }
    }
    
    //线程池使用
    @Resource(name = "consumerQueueThreadPool")
        private ExecutorService consumerQueueThreadPool;
        @Override
        public void execute() {
            //消费队列
            for (int i = 0; i < 5; i++) {
                consumerQueueThreadPool.execute(new ConsumerQueueThread());
            }
        }
    
    线程池隔离

    hystrix 隔离
    首先需要定义两个线程池,分别用于执行订单、处理用户。

    /**
     * Function:订单服务
     *
     * @author crossoverJie
     *         Date: 2018/7/28 16:43
     * @since JDK 1.8
     */
    public class CommandOrder extends HystrixCommand<String> {
    
        private final static Logger LOGGER = LoggerFactory.getLogger(CommandOrder.class);
    
        private String orderName;
    
        public CommandOrder(String orderName) {
    
    
            super(Setter.withGroupKey(
                    //服务分组
                    HystrixCommandGroupKey.Factory.asKey("OrderGroup"))
                    //线程分组
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool"))
    
                    //线程池配置
                    .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                            .withCoreSize(10)
                            .withKeepAliveTimeMinutes(5)
                            .withMaxQueueSize(10)
                            .withQueueSizeRejectionThreshold(10000))
    
                    .andCommandPropertiesDefaults(
                            HystrixCommandProperties.Setter()
                                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
            )
            ;
            this.orderName = orderName;
        }
    
    
        @Override
        public String run() throws Exception {
    
            LOGGER.info("orderName=[{}]", orderName);
    
            TimeUnit.MILLISECONDS.sleep(100);
            return "OrderName=" + orderName;
        }
    
    
    }
    
    
    /**
     * Function:用户服务
     *
     * @author crossoverJie
     *         Date: 2018/7/28 16:43
     * @since JDK 1.8
     */
    public class CommandUser extends HystrixCommand<String> {
    
        private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class);
    
        private String userName;
    
        public CommandUser(String userName) {
    
    
            super(Setter.withGroupKey(
                    //服务分组
                    HystrixCommandGroupKey.Factory.asKey("UserGroup"))
                    //线程分组
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool"))
    
                    //线程池配置
                    .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                            .withCoreSize(10)
                            .withKeepAliveTimeMinutes(5)
                            .withMaxQueueSize(10)
                            .withQueueSizeRejectionThreshold(10000))
    
                    //线程池隔离
                    .andCommandPropertiesDefaults(
                            HystrixCommandProperties.Setter()
                                    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
            )
            ;
            this.userName = userName;
        }
    
    
        @Override
        public String run() throws Exception {
    
            LOGGER.info("userName=[{}]", userName);
    
            TimeUnit.MILLISECONDS.sleep(100);
            return "userName=" + userName;
        }
    }
    

    模拟运行:

    public static void main(String[] args) throws Exception {
            CommandOrder commandPhone = new CommandOrder("手机");
            CommandOrder command = new CommandOrder("电视");
    
            //阻塞方式执行
            String execute = commandPhone.execute();
            LOGGER.info("execute=[{}]", execute);
    
            //异步非阻塞方式
            Future<String> queue = command.queue();
            String value = queue.get(200, TimeUnit.MILLISECONDS);
            LOGGER.info("value=[{}]", value);
    
            CommandUser commandUser = new CommandUser("张三");
            String name = commandUser.execute();
            LOGGER.info("name=[{}]", name);
        }
    

    原理:利用一个 Map 来存放不同业务对应的线程池。
    注意:自定义的 Command 并不是一个单例,每次执行需要 new 一个实例,不然会报 This instance can only be executed once. Please instantiate a new instance.

    参考:如何优雅的使用和理解线程池


    技术讨论 & 疑问建议 & 个人博客

    版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议,转载请注明出处!

    相关文章

      网友评论

        本文标题:线程池正确使用姿势

        本文链接:https://www.haomeiwen.com/subject/yyraoqtx.html