美文网首页
Java线程池的创建方式及调优|笔记

Java线程池的创建方式及调优|笔记

作者: Mr培 | 来源:发表于2022-08-24 13:09 被阅读0次

    创建线程池的方式

    • ThreadPoolExecutor
    • ScheduledThreadPoolExecutor
    • ForkJoinPool

    ThreadPoolExecutor

    • 案例 多线程处理大量数据

    CountDownLatch 来使主线程等待线程池中的线程执行完毕。

    package com.example.demo.threadpool;
    
    import cn.hutool.core.io.file.FileReader;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 多线程处理大量数据
     * @author rp
     */
    @RestController
    @RequestMapping("/vast")
    public class VastDataController {
    
        private static List<String> mysqlData;
    
        private static CountDownLatch threadsSignal;
    
        /**
         * 每个线程处理的数据量
         * */
        private static final int count = 10000;
    
        static class ThreadNameFactory implements ThreadFactory {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadNumber.getAndIncrement()+"线程");
            }
        }
    
        /**
         * 定义线程池数量为8,每个线程处理1000条数据
         * */
        private static ThreadPoolExecutor execPool = new ThreadPoolExecutor(8, 8,
                5, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadNameFactory());
    
    
        /**
         * 多线程模拟处理大量数据,模拟请求
         * */
        @RequestMapping("/dataHandle")
        public static String dataHandle(){
            mysqlData = new ArrayList<>();
            long start = System.currentTimeMillis();
            batchAddData();
            long end = System.currentTimeMillis();
            System.out.println("==========");
            String s = end - start + "ms";
            System.out.println(s);
            mysqlData = null;
            return s;
        }
    
    
    
        /**
         * 多线程批量执行插入,百万数据需要大约不到20秒   64位4核处理
         */
        public static void batchAddData() {
            //需要插入数据库的数据
            List<String> limodel = readFile();
            try {
                if(limodel.size() <= count) {
                    threadsSignal = new CountDownLatch(1);
                    execPool.submit(new InsertDate(limodel));
                }else {
                    List<List<String>> li = createList(limodel, count);
                    threadsSignal = new CountDownLatch(li.size());
                    for(List<String> liop : li) {
                        execPool.submit(new InsertDate(liop));
                    }
                }
                threadsSignal.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            limodel = null;
        }
    
        /**
         * 数据拆分
         * @param targe
         * @param size
         * @return
         */
        private static List<List<String>>  createList(List<String> targe, int size) {
            List<List<String>> listArr = new ArrayList<>();
            //获取被拆分的数组个数
            int arrSize = targe.size() % size == 0 ? targe.size() / size : targe.size() / size + 1;
            for(int i = 0 ; i < arrSize ; i++) {
                List<String> sub = new ArrayList<String>();
                //把指定索引数据放入到list中
                for(int j = i*size ; j <= size*(i+1)-1 ; j++) {
                    if(j <= targe.size()-1) {
                        sub.add(targe.get(j));
                    }
                }
                listArr.add(sub);
            }
            return listArr;
        }
    
        /**
         * 内部类,开启线程批量保存数据
         * @author rp
         *
         */
        static class  InsertDate  extends Thread{
            List<String> lienties = new ArrayList<>();
            InsertDate(List<String> listModel){
                //可对数据进行定制化处理/计算等
                lienties.addAll(listModel);
            }
            @Override
            public void run() {
                //模拟存入数据库
                mysqlData.addAll(lienties);
                threadsSignal.countDown();
            }
        }
    
        public static List<String> readFile(){
            //默认UTF-8编码,可以在构造中传入第二个参数做为编码,vast.txt 测试数据文件
            FileReader fileReader = new FileReader("/Users/****/develop/***/interview/src/main/resources/vast.txt");
            String[] split = fileReader.readString().split(",");
            return Arrays.asList(split);
        }
    
    }
    
    

    ScheduledThreadPoolExecutor

    package com.example.demo.threadpool;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 线程池ScheduledThreadPoolExecutor详解
     * @author rp
     */
    @Slf4j
    public class ScheduledThreadPoolExecutorTest {
    
        static class ThreadNameFactory implements ThreadFactory {
            private final AtomicInteger threadNumber = new AtomicInteger(5);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadNumber.getAndIncrement()+"线程");
            }
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
                    5,
                    new ThreadNameFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
            /**
             * 延迟3秒执行任务
             * */
            executor.schedule(
                new Runnable() {
                    @Override
                    public void run() {
                        log.info("延迟3秒后执行~~~");
                    }
                },
                    3,
                    TimeUnit.SECONDS
            );
            /**
             * 延迟5秒后执行
             * */
            ScheduledFuture<String> schedule = executor.schedule(
                    new Callable<String>() {
                         @Override
                         public String call() throws Exception {
                             return "获得执行结果";
                         }
                     },
                    5,
                    TimeUnit.SECONDS
            );
            log.info(schedule.get());
            /**
             * scheduleAtFixedRate
             * */
            executor.scheduleAtFixedRate(
                    new Runnable() {
                        @Override
                        public void run() {
                            log.info("执行任务");
                        }
                    },
    //                第一次执行任务延迟多久
                    0,
    //                每隔多久执行一次任务
                    3,
                    TimeUnit.SECONDS
            );
            /**
             * scheduleWithFixedDelay
             * */
            executor.scheduleWithFixedDelay(
                    new Runnable() {
                        @Override
                        public void run() {
                            log.info("执行任务");
                        }
                    },
    //                第一次执行任务延迟多久
                    0,
    //               每次执行完任务之后,延迟多久再次执行这个任务
                    3,
                    TimeUnit.SECONDS
            );
        }
    
    
    }
    
    

    ForkJoinPool

    package com.example.demo.threadpool;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * ForkJoinPool 实现1-100的求和
     * ForkJoinPool 实际中不怎么使用
     * @author rp
     */
    @Slf4j
    public class ForkJoinPoolTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Integer> task = forkJoinPool.submit(new Task(1, 1000));
            Integer integer = task.get();
            log.info("求和结果:{}",integer);
        }
    
        static class Task extends RecursiveTask<Integer> {
            /**
             * 当前任务计算的起始
             */
            private Integer start;
            /**
             * 当前任务计算的结束
             */
            private Integer end;
            /**
             * 阈值,end-start在阈值以内,那么就不用再去细分任务
             */
            private static final int threshold = 2;
    
            public Integer getStart() {
                return start;
            }
    
            public void setStart(Integer start) {
                this.start = start;
            }
    
            public Integer getEnd() {
                return end;
            }
    
            public void setEnd(Integer end) {
                this.end = end;
            }
    
            public Task(Integer start, Integer end) {
                this.start = start;
                this.end = end;
            }
    
            @Override
            protected Integer compute() {
                int sum = 0;
                boolean needFork = (end - start) > threshold;
                if (needFork){
                    int middle = (end + start)/2;
                    Task leftTask = new Task(start, middle);
                    Task rightTask = new Task(middle+1, end);
                    //执行子任务
                    leftTask.fork();
                    rightTask.fork();
                    //子任务执行完成之后的结果
                    Integer leftJoin = leftTask.join();
                    Integer rightJoin = rightTask.join();
                    sum = leftJoin+rightJoin;
                }else {
                    for (int i = start;i <= end;i++){
                        sum += i;
                    }
                }
                return sum;
            }
        }
    
    }
    

    线程池调优

    调优工具类

    package com.example.demo.threadpool.tuning;
    
    import java.math.BigDecimal;
    import java.math.RoundingMode;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired
     * work queue memory consumption as input and retuns thread count and work queue capacity.
     *
     * @author Niklas Schlimm
     */
    public abstract class PoolSizeCalculator {
    
        /**
         * The sample queue size to calculate the size of a single {@link Runnable} element.
         */
        private final int SAMPLE_QUEUE_SIZE = 1000;
    
        /**
         * Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be
         * configurable.
         */
        private final int EPSYLON = 20;
    
        /**
         * Control variable for the CPU time investigation.
         */
        private volatile boolean expired;
    
        /**
         * Time (millis) of the test run in the CPU time calculation.
         */
        private final long testtime = 3000;
    
        /**
         * Calculates the boundaries of a thread pool for a given {@link Runnable}.
         *
         * @param targetUtilization    the desired utilization of the CPUs (0 <= targetUtilization <= 1)
         * @param targetQueueSizeBytes the desired maximum work queue size of the thread pool (bytes)
         */
        protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {
            calculateOptimalCapacity(targetQueueSizeBytes);
            Runnable task = creatTask();
            start(task);
            start(task); // warm up phase
            long cputime = getCurrentThreadCPUTime();
            start(task); // test intervall
            cputime = getCurrentThreadCPUTime() - cputime;
            long waittime = (testtime * 1000000) - cputime;
            calculateOptimalThreadCount(cputime, waittime, targetUtilization);
        }
    
        private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {
            long mem = calculateMemoryUsage();
            BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);
            System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);
            System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem
                    + " bytes in a queue");
            System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);
            System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);
        }
    
        /**
         * Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)
         *
         * @param cpu               cpu time consumed by considered task
         * @param wait              wait time of considered task
         * @param targetUtilization target utilization of the system
         */
        private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {
            BigDecimal waitTime = new BigDecimal(wait);
            BigDecimal computeTime = new BigDecimal(cpu);
            BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());
            BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply(
                    new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));
            System.out.println("Number of CPU: " + numberOfCPU);
            System.out.println("Target utilization: " + targetUtilization);
            System.out.println("Elapsed time (nanos): " + (testtime * 1000000));
            System.out.println("Compute time (nanos): " + cpu);
            System.out.println("Wait time (nanos): " + wait);
            System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / "
                    + computeTime + ")");
            System.out.println("* Optimal thread count: " + optimalthreadcount);
        }
    
        static class ThreadNameFactory implements ThreadFactory {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
    
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, threadNumber.getAndIncrement() + "线程");
            }
        }
    
        /**
         * Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas
         * (http://www.javaspecialists.eu/archive/Issue124.html).
         *
         * @param task the runnable under investigation
         */
        public void start(Runnable task) {
            long start = 0;
            int runs = 0;
            do {
                if (++runs > 5) {
                    throw new IllegalStateException("Test not accurate");
                }
                expired = false;
                start = System.currentTimeMillis();
    
    //   Timer timer = new Timer();
    //   timer.schedule(new TimerTask() {
    //    @Override
    //    public void run() {
    //     expired = true;
    //    }
    //   }, testtime);
                ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
                        1,
                        new ThreadNameFactory(),
                        new ThreadPoolExecutor.AbortPolicy()
                );
                executor.schedule(
                        new Runnable() {
                            @Override
                            public void run() {
                                expired = true;
                            }
                        },
                        testtime,
                        TimeUnit.MILLISECONDS
                );
    
    
                while (!expired) {
                    task.run();
                }
                start = System.currentTimeMillis() - start;
                executor.shutdown();
    //   timer.cancel();
            } while (Math.abs(start - testtime) > EPSYLON);
            collectGarbage(3);
        }
    
        private void collectGarbage(int times) {
            for (int i = 0; i < times; i++) {
                System.gc();
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    
        /**
         * Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas
         * (http://www.javaspecialists.eu/archive/Issue029.html).
         *
         * @return memory usage of a single {@link Runnable} element in the thread pools work queue
         */
        public long calculateMemoryUsage() {
            BlockingQueue<Runnable> queue = createWorkQueue();
            for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
                queue.add(creatTask());
            }
            long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
            long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
            queue = null;
            collectGarbage(15);
            mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
            queue = createWorkQueue();
            for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
                queue.add(creatTask());
            }
            collectGarbage(15);
            mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
            return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
        }
    
        /**
         * Create your runnable task here.
         *
         * @return an instance of your runnable task under investigation
         */
        protected abstract Runnable creatTask();
    
        /**
         * Return an instance of the queue used in the thread pool.
         *
         * @return queue instance
         */
        protected abstract BlockingQueue<Runnable> createWorkQueue();
    
        /**
         * Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g.
         * http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results
         * for thread count boundaries.
         *
         * @return current cpu time of current thread
         */
        protected abstract long getCurrentThreadCPUTime();
    
    }
    

    调优案例

    package com.example.demo.threadpool.tuning;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.lang.management.ManagementFactory;
    import java.math.BigDecimal;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
    * 线程池调优示例
    * 线程数调优
    * 1. CPU密集型任务 N+1
    * 2. IO密集型任务  2N
    * 3. 混合型任务
    * N * ∪ * (1+WT/ST)
    * ●N: CPU核心数
    * ●U: 目标CPU利用率
    * ●WT: 线程等待时间
    * ●ST: 线程运行时间
    * @author rp
    */
    @Slf4j
    public class MyPoolSizeCalculator extends PoolSizeCalculator {
       /**
        * 运行结果将打印出应设置相关调优参数数值
        * */
       public static void main(String[] args) {
           MyPoolSizeCalculator calculator = new MyPoolSizeCalculator();
           calculator.calculateBoundaries(
                   //CPU目标利用率
                   new BigDecimal(1.0),
                   // BlockingQueue 占用的内存大小,byte
                   new BigDecimal(100000));
       }
    
       @Override
       protected long getCurrentThreadCPUTime() {
           //当前线程占用的总时间
           return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
       }
    
       /**
        * 实际项目中需要运行的任务
        * */
       @Override
       protected Runnable creatTask() {
           return new Runnable() {
               @Override
               public void run() {
                   //log.info("实际项目中需要运行的任务");
               }
           };
       }
    
       /**
        * 计算BlockingQueue大小
        * */
       @Override
       protected BlockingQueue createWorkQueue() {
           return new LinkedBlockingQueue<>();
       }
    
    }
    

    运行结果


    截屏2022-09-22 上午10.08.39.png
    • Recommended queue capacity (bytes): 2500

      对应队列数 new LinkedBlockingQueue<>(2500)

    • Formula: 12 * 1 * (1 + 9672000 / 2990328000)

      对应公式 N * ∪ * (1+WT/ST)

    • Optimal thread count: 12

      对应线程数 corePoolSize=12,maximumPoolSize=12

    相关文章

      网友评论

          本文标题:Java线程池的创建方式及调优|笔记

          本文链接:https://www.haomeiwen.com/subject/kqxogrtx.html