美文网首页
java多线程与高并发(九)线程池与源码解读

java多线程与高并发(九)线程池与源码解读

作者: 小偷阿辉 | 来源:发表于2021-04-28 07:20 被阅读0次

    1.回顾

    之前说的Executor作用是把线程的定义和执行分开,主要是用来做线程的执行接口,在他下面还有一个控制着线程生命周期的ExecutorService,然后才是各种各样的ThreadPoolExecutor,把线程池作为一个执行的单元,给他单独出一个类,下面是他的七个参数
    corePoolSize 核心线程数
    maxmumPoolSize 最大线程数
    keepAliveTime 生存时间
    TimeUnit 生存时间的单位
    BlockingQueue 任务队列
    **ThreadFactory **线程工厂
    RejectStrategy 拒绝策略(Abort 抛异常 Discard扔掉 不抛异常 DiscardOldest 扔掉排队时间最久的,CallerRuns 调用处理者处理服务)

    2.jdk自带线程池

    今天我们来看看JDK给我们提供了一些默认线程池的实现,默认的常用的有哪些,然后来读读ThreadPoolExecutor源码
    所有的线程池都是继承ExecutorService的,所以Executors是对线程执行的工具类,可以看做是线程的工厂,产生各种各样的线程池

    2.1.SingleThreadPool

    先来看第一个SingleThreadPool ,看这个名字就觉得只有一个线程,这个一个线程的线程池可以保证扔进去的任务是顺序执行的

    package com.learn.thread.eight;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class TestSingleThreadPool {
        private static ExecutorService service = Executors.newSingleThreadExecutor();
    
    
        public static void main(String[] args) {
            for (int i = 0; i < 5; i++) {
                final int j = i;
                // 顺序执行的
                service.execute(() -> {
                    System.out.println(j + " " +Thread.currentThread().getName());
                });
                
            }
        }
    }
    
    
    SingleThreadPool.png

    2.2.CacheThreadPool

    我们来看第二种,看他源码实际上是跟之前的SingleThreadPool一样,底层是还是ThreadPoolExecutor


    CachePool.png

    没有核心线程数,最大线程可以很Intger的最大值个,如果60秒没人理他,自动被回收
    任务队列用的是SynchronousQueue 不是用来存数据的,用来传递消息的,如果任务没有被执行,就会被阻塞
    用的是默认线程工厂
    没有指定拒绝策略,用默认拒绝策略
    可以看出CachePool的特点,就是你来一个任务我启动一个线程。启动线程的逻辑如下
    如果线程没有被回收,就去看当前线程池的线程是不是有空闲的线程,如果有就执行让它去执行任务。如果没有,就自己new 一个线程去执行,原因是队列是SynchronousQueue ,它必须保证它的大小为0,所以你来一个任务必须有一个线程去执行,不然别的线程提交任务就统统阻塞了
    来看这个小程序,首先将线程池service打印出来,最后又打印一遍线程池services,然后任务是睡眠500毫秒

    package com.learn.thread.eight;
    
    import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author zglx
     */
    public class TestCachePool {
        private static ExecutorService service = Executors.newCachedThreadPool();
    
        public static void main(String[] args) throws InterruptedException {
            System.out.println(service);
            for (int i = 0; i < 2; i++) {
                // 顺序执行的
                Thread.sleep(100);
                service.execute(() -> {
                    try {
                        Thread.sleep(0);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                });
            }
            System.out.println(service);
            Thread.sleep(800);
            System.out.println(service);
        }
    
    
    }
    

    注意线程存活的时间是60S,所以第一个线程被复用了

    2.3.FixedThreadPool

    fixed是固定的含义,就是固定一个线程数。


    FixedThreadPool.png

    fixedThreadPool指定一个参数,到底有多少线程,核心线程数和最大线程数是固定的,所以不存在回收之说,但是这里用的是LinkedBlockingQueue,这里一定要小心,因为是不建议使用的,因为是会造成内存泄漏

    但是用fixedThreadPool有一个好处,可以进行并行的计算
    并行与并发 并发是指任务的提交、并行是指任务执行,并行是并发的子集,并行是多个cpu可以同时进行处理,并发是多个任务同时过来。

    我们看下面一个程序,用多线程计算1-200000中的质数,可以对这个区间进行分组

    package com.learn.thread.eight;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    public class TestFixedThreadPool {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            long start = System.currentTimeMillis();
            List<Integer> list = getPrime(1,200000);
            list.forEach(System.out::println);
            long end = System.currentTimeMillis();
            System.out.println("time " + (end - start));
    
            start = System.currentTimeMillis();
            int num = 4;
            ExecutorService service = Executors.newFixedThreadPool(num);
            Future<List<Integer>> future1 = service.submit(new Mytask(1, 80000));
            Future<List<Integer>> future2 = service.submit(new Mytask(80001, 160000));
            Future<List<Integer>> future3 = service.submit(new Mytask(160001, 200000));
            future2.get().addAll(future3.get());
            future1.get().addAll(future2.get());
            future1.get().forEach(System.out::println);
            end = System.currentTimeMillis();
            System.out.println("time " + (end - start));
        }
    
        public static List<Integer> getPrime(Integer start, Integer end) {
            List<Integer> list = new ArrayList<>(100);
            for (Integer i = start; i <= end; i++) {
                if (isPrime(i)) {
                    list.add(i);
                }
            }
    
            return list;
        }
    
        public static boolean isPrime(Integer num) {
            for (int i = 2; i <= num/2; i++) {
                if (num % i == 0) {
                    return false;
                }
            }
            return true;
        }
    
        static class Mytask implements Callable<List<Integer>> {
            int start;
            int end;
            public Mytask() {
            }
    
            public Mytask(int start, int end) {
                this.start = start;
                this.end = end;
            }
    
            @Override
            public List<Integer> call() throws Exception {
                List<Integer> result = getPrime(start, end);
                return result;
            }
        }
    }
    
    

    2.4.cacahe vs fixed

    什么时候用cache 什么时候用fixed ,这得看你的业务量,如果线程池的线程太多,他们就会竞争稀缺的处理器和内存资源,浪费大量的时候在上下文切换,反之如果线程太少,处理器就可能无法充分利用。
    建议:线程池大小与处理器的利用率之比可以使用公式来进行估算

    线程池 = 你有多少个CPU乘以cpu期望利用率 乘以 (1+W/C)W除C是等待时间与计算时间的比率
    表达式为
    Nthread = Ncpu * Ucpu * (1+W/C)

    如果你的任务不确定是否平稳,但是要保证任务来的时候有线程去执行,那我们就可以用cache,当然你要保证这个任务不会堆积。

    假如你大概估了线程的值,这个值完全可以处理任务,我可以直接New 一个线程来执行,那就用fixed,但是阿里不建议这么使用

    2.5.ScheduledThreadPool

    定时任务线程池,就是一个定时器任务,隔一段时间后执行,这个就是我们专门用来执行定时任务的一个线程池。


    ScheduledThreadPool.png

    这里super调用的是ThreadPoolExecutor 本质上还是ThreadPoolExecutor,它最大线程数也是Integer的最大值,用的队列是DelayedWorkQueue。
    它有一些好用的方法,比如scheduleAtFixedRate间隔多长时间在一个固定频率执行一次这个任务,可用通过这样灵活的时间配置。
    第一个参数是Delay,第一次执行任务在此之后多长时间
    第二个参数period间隔多长时间
    第三个参数是时间单位

    package com.learn.thread.eight;
    
    import com.learn.thread.five.ExecutorService;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class TestScheduledPool {
        private static ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
    
        public static void main(String[] args) {
            // 5秒打印一次线程名字
            service.scheduleAtFixedRate(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }, 0, 5, TimeUnit.SECONDS);
        }
    }
    

    假如有一个闹钟服务,假如有十亿人订阅了这个闹钟,意味着,每天早上会有10亿的并发量,你怎么优化
    思想就是把这个定时的任务分发到很多个边缘的服务器上,假如说你有一台服务器,你也是要用到多线程去消费,总之就是一个分而治之的思想

    SingleThreadPool 只有一个线程的线程池
    FixedThreadPool 固定多少个线程
    CacheThreadPool 有弹性的线程池,有一个启动一个,只要没有空闲的就启动一个新的来执行
    ScheduleThreadPool 定时任务来执行线程池

    package com.learn.thread.eight;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.*;
    
    /**
     * 自定义线程池的拒绝策略
     */
    public class TestRejectedHandler {
        private static ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread("t1");
            }
        };
        private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),factory, new MyHandler());
    
        static class MyHandler implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("asdasd");
    
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                executorService.execute(() -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    
    }
    
    

    3.ThreadPoolExecutor

    3.1.常用变量的解释

    // AtomicInteger是int类型,是32位。高3位代表线程状态,低29位代表目前线程池有多少个线程数量,这里把两个值合二为一就是算了执行效率更高一些,因为都需要线程同步,而同步一个值往往比同步一个值容易的多

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        // Integer.size为32,所以COUNT_BITS为29
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
        // CAPACITY 线程允许的最大线程数,1左移29位,然后减1,就是2^29-1
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // (线程有五种状态按大小排序为  RUNNING -> SHUTDOWN  -> STOP -> TIDYING   -> TERMINATED)
        // 正常运行的
        private static final int RUNNING    = -1 << COUNT_BITS;
        // 调用shutdown方法进去了shutdown状态
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        // 调用shutdown马上停止
        private static final int STOP       =  1 << COUNT_BITS;
        // 调用了shutdown然后这个线程也执行完了,现在正在处理的过程叫做TIDYING
        private static final int TIDYING    =  2 << COUNT_BITS;
        // 整个线程全部结束
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // 获取线程池的状态 通过按位与操作,低29位将全部变成0
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 获取线程的数量,通过按位与操作,高3位全部变成0
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // 根据线程池状态和线程池的线程数量生成ct1值
        private static int ctlOf(int rs, int wc) { return rs | wc; }
        // 线程池状态小于xx
    private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
        // 线程池状态大于等于xx
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    

    3.2.构造方法

        // 构造方法
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            // 基本类型参数校验
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            // 空指针校验
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            // 安全管理器
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            // 核心线程数
            this.corePoolSize = corePoolSize;
            // 最大线程池数
            this.maximumPoolSize = maximumPoolSize;
            // 
            this.workQueue = workQueue;
            // 根据传入参数unit和keepAliveTime 将存活时间转换为纳秒存到变量keepAliveTime中
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            // 线程工厂
            this.threadFactory = threadFactory;
            // 策略
            this.handler = handler;
        }
    

    3.3. 提交执行task的过程

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 1.判断线程池活着的那些线程数是不是小于核心线程数,如果小于就addWorker添加一个线程。
            if (workerCountOf(c) < corePoolSize) {
                // addWorker创建线程,第二个参数表示是否创建核心线程
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 这里处理当前线程数超过核心线程数的逻辑
            // 2.先判断当前线程池状态,如果是在运行中,就把任务丢到队列里边去,否则拒绝任务
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 这里需要双重判断线程池的状态,因为这阻塞的过程中有可能线程池的状态被改变了,
                // 如果不是Running状态,说明线程池执行了shutdown操作,就删除此任务,并且拒绝
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 如果工作状态的线程为0,说明没有线程了或者核心线程数设置为0了,就添加非核心线程数
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 3.如果线程不是运行状态,那就任务进入队列,这里有3点需要注意
            // 线程池不是运行状态,addWorker内部会判断线程池的状态
            // 第二个参数表示是否创建核心线程
            // addWorker返回false说明任务执行失败,需要拒绝任务
            else if (!addWorker(command, false))
                reject(command);
        }
    
    

    3.4. addWorker源码分析

    addWorker涉及到了很多细节,如果要读懂每一个细节完全不必要,但是思想理解就行了,addWorker的意思就是添加线程,线程要存到容器里,往里头添加线程任务的时候肯定是多个线程同时往里面扔的,所以一定要同步,但是追求效率,一般都是用自旋或者lock

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            // 两层死循环就为了做一个事情,添加一个woker的数量加1
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 判断线程池状态满足以下条件,就返回
                // 1、线程池状态大于SHUTDOWN
                // 2、线程池状态等于SHUTDOWN并且firstTask执行任务不为null,直接返回false
                // 3、线程池状态等于SHUTDOWN,且队列为空,直接返回false
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                // 嵌套死循环
                for (;;) {
            
                    int wc = workerCountOf(c);
                    // 如果当前线程超过最大允许线程数,或者根据core状态,大于核心线程或者最大线程数,返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 尝试把当前执行线程数+1,如果+1成功,打破循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    // 重新获取当前线程池状态
                    c = ctl.get();  
                    // 如果当前执行线程数不等于之前读取的数量,说明有别的线程加!成功了
                    if (runStateOf(c) != rs)
                    // 重试
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            // 下面的逻辑是创建一个线程去执行任务
            // 是否执行任务状态,true执行成功
            boolean workerStarted = false;
            // 判断是否假如addWorker状态 add加入成功
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 创建一个Worker
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    // worker的添加必须是串行的,因此必须加锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        // 重新检查当前线程池状态,查询当前执行任务的数量
                        int rs = runStateOf(ctl.get());
                        // 如果rs小于SHUTDOWN,或者rs为SHUTDOWN并且firstTask为null
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // workder已经调用过了start方法,则不再创建worker,抛出异常
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // worker创建并添加workers成功
                            workers.add(w);
                            // 更新largestPoolSize变量
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                    }
                    // 启动worker线程
                    if (workerAdded) {
                        // 这里会调用worker的run方法,
                        // 实际上是调用执行runWorker方法
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行), 需要进行shutdown相关操作
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    

    3.5.线程池worker任务单元

    来看看worker是个什么东西,看源码是一个静态内部类Worker

      private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
    
            // 1. 线程
            final Thread thread;
            // 2. 任务
            Runnable firstTask;
            // 3. 执行次数
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
        
            // 线程池执行的核心方法
              public void run() {
                runWorker(this);
            }
            }
    

    3.6.核心线程执行逻辑runWorker

    这里有一个有意思的地方可以看到Worker 是实现了Runnable,它自己也有一个Runnbale和Thread,你就把Worker当成一个工人,工人有任务(Runnbale)和执行能力(Thread),但是你得保证每一个工人执行的任务是自己的,并且自己执行完了以后,completedTasks要加1
    所以当多线程添加任务的时候,把当前线程复制一份给Thread和任务下发给工人的Runnbale,然后让工人去lock,工人的lock 其实就是之前学过ReentrantLock的acquire方法,加入链表,等待执行,这样子就完成了整个串行。

    final void runWorker(Worker w) {
            // 1. 这里是用来自己实现方法的beforeExecute,自己实现内容
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            // 这里调用了unlock,其实也就调用了release(1),执行过程中允许线程被中断
            w.unlock(); // allow interrupts
            // 判断是否继续自旋
            boolean completedAbruptly = true;
            try {
                // 这里判断任务是否为存在或者队列中的任务不为空,注意如果从队列取就会造成阻塞
                while (task != null || (task = getTask()) != null) {
                // CAS加锁
                    w.lock();
                    // 判断当前线程池状态是否为停止,如果停止了,则中断
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                    // 这里用来扩展功能,但是没用到
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 这个工人开始执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        // gc回收
                        task = null;
                        // 任务加1
                        w.completedTasks++;
                        // 释放锁
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // 自旋操作退出,说明线程池结束
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    4.woker类总结

    这个woker是一个Thread 也是一个Runnable 同样也是一个AQS,先来说说为什么要有Runnable和Thread

    4.1.为什么要有Runnable?

    这里其实是用来记录任务的,因为Woker 里边有很多状态需要根据当前任务去记录,并且这个东西必须要在Thread中执行

    4.2.为什么要有Thread?

    因为线程池有很多Woker去竞争,所以干脆就把Woker设计成AQS,一个线程处理一个当前任务,而不是说有其他worker执行了不是自己的任务

    4.3.submit和execute的区别是什么

    方法定义的位置不同,execute是在Executor执行器中,而submit是在ExecutorService执行服务中的
    参数接收不同,execute接收的是Runnbale ,submit接收的是Callable
    作用不用,execute只是单纯的执行任务,submit可以把任务的结果带出来。

    4.4.线程池大概执行流程

    核心线程数不够,启动核心线程
    核心线程满了,加入队列
    核心线程和队列都满了,addWorker 加入非核心线程

    4.5.addWorker 做的事情

    count 加1
    真正的加入任务并且start
    WorkStealingPool
    WorkStealingPool 是另外一种线程,核心非常简单,之前讲的ThreadPoolExecutor线程是去一个任务的队列里取任务。而WorkStealingPool是每一个线程都有自己的任务队列,当一个线程执行完以后,会在别的线程任务队列中偷任务

    跟原来只有一个队列的线程池相比,如果有某一个线程被占用了很长时间,然后任务队列又特别的重,那其他线程只能空着,没办法帮到任务重的线程

    源码

        public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }
    

    WorkStealingPool本质上是一个ForkJoinPool

    package com.learn.thread.eight;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class TestWorkStealingPool {
        private static final ExecutorService service = Executors.newWorkStealingPool();
    
        public static void main(String[] args) throws IOException {
            System.out.println(Runtime.getRuntime().availableProcessors());
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            System.in.read();
        }
    
        static class R implements Runnable {
            int time;
    
            R(int time) {
                this.time = time;
            }
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(time + " " + Thread.currentThread().getName());
            }
        }
    }
    
    
    image.png

    5.ForkJoinPool

    ForkJoinPool 适合把大任务切分成一个个小任务去执行,小任务如果还是大,再切,不一定是两个,可以是多个,但是最终的结果就是多个小任务结果的汇总。这个过程就叫做join ,所以这种线程池叫做ForkJoinPool。
    既然是可以分割的任务,那怎么定义任务呢,之前线程池执行的任务就是Runnable。在这里,我们一般实现ForkJoinPool的时候需要定义成特定他的类型,这个类型又必须是可以分叉的任务,这个任务就叫做ForkJoinTask,但是实际上这个ForkJoinTask又比较原始,我们可以用RecursiveAction,这里边又有两种。
    第一个RecursiveAction递归,称为不带返回值的任务,因为我们可以把大任务分割成小任务,小任务又可以分成小任务,一直到我满意的条件为止,这其中就带有递归的过程。等会来看看一个例子,所以我要对一百万个数进行求和,单线程肯定很慢。
    第二个RecursiveTask,叫做带返回值的子任务

    package com.learn.thread.eight;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Random;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.RecursiveTask;
    
    public class TestForkJoinPool {
        private static int[] nums = new int[1000000];
        // 按5万个一组,进行分组
        private static int MAX_SIZE = 500000;
    
        static Random random = new Random();
    
        static {
            for (int i = 0; i < 1000000; i++) {
                nums[i] = random.nextInt(100);
            }
            System.out.println("-----" + Arrays.stream(nums).sum());
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            // 无返回参数子任务
            AddTask addTask = new AddTask(0, nums.length);
    //      forkJoinPool.execute(addTask);
    
            AddTaskRet addTaskRet = new AddTaskRet(0, nums.length);
            forkJoinPool.invoke(addTaskRet);
            System.out.println(addTaskRet.join());
            System.in.read();
        }
    
        /**
         * 不带返回值的ForkJoinPool
         */
        static class AddTask extends RecursiveAction {
            private int start;
    
            private int end;
    
            public AddTask(int start, int end) {
                this.start = start;
                this.end = end;
            }
    
            @Override
            protected void compute() {
                // 如果是在分组内,开始计算,否则再分组
                if (end - start <= MAX_SIZE) {
                    long sum = 0L;
                    for (int i = start; i < end; i++) {
                        sum += nums[i];
                    }
                    System.out.println("from start" + start + "to end " + end + "sum =" + sum);
                } else {
                    int middle = start + (end - start) / 2;
                    AddTask addTask = new AddTask(start, middle);
                    AddTask addTask1 = new AddTask(middle, end);
                    addTask.fork();
                    addTask1.fork();
                }
            }
        }
    
        /**
         * 不带返回值的ForkJoinPool
         */
        static class AddTaskRet extends RecursiveTask<Long> {
            private int start;
    
            private int end;
    
            public AddTaskRet(int start, int end) {
                this.start = start;
                this.end = end;
            }
    
            @Override
            protected Long compute() {
                // 如果是在分组内,开始计算,否则再分组
                if (end - start <= MAX_SIZE) {
                    long sum = 0L;
                    for (int i = start; i < end; i++) {
                        sum += nums[i];
                    }
                    return sum;
                }
                int middle = start + (end - start) / 2;
                AddTaskRet addTask = new AddTaskRet(start, middle);
                AddTaskRet addTask1 = new AddTaskRet(middle, end);
                addTask.fork();
                addTask1.fork();
                return addTask.join() + addTask1.join();
            }
        }
    }
    
    

    6.parallelStream

    java8 有一个并行流,底层就是ForkJoinPool算法来实现的。
    你可以把集合里面的内容想像成一个个河流往外流,在流经过某个地方的时候处理一下。
    举例:我们new 了一个1000000 数据的集合,然后判断这些数是不是质数,foreach是lamdba表达式的一个流式处理,还是相当于一个遍历循环。
    但是parallelStream并行流是并行的来处理这个任务切分成一个个子任务,所以跟foreach会有一个时间上的差距。所以在互相之间线程不需要同步的时候,你可以用这种并行流来处理效率会高一些

    package com.learn.thread.eight;
    
    import com.google.common.collect.RangeMap;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    public class TestParallelStream {
    
        public static boolean isPrime(int num) {
            for (int i = 2; i < num/2; i++) {
                if (num % i ==0) {
                    return false;
                }
            }
            return true;
        }
        public static void main(String[] args) {
            List<Integer> list = new ArrayList<>();
            Random random = new Random();
            for (int i = 0; i < 1000000; i++) {
                list.add(random.nextInt(1000000));
            }
    
            long start = System.currentTimeMillis();
            list.forEach(TestParallelStream::isPrime);
            long end = System.currentTimeMillis();
            System.out.println("ends" + (end - start));
    
            start = System.currentTimeMillis();
            list.parallelStream().forEach(TestParallelStream::isPrime);
            end = System.currentTimeMillis();
            System.out.println("ends" + (end - start));
        }
    }
    
    

    总结
    线程池有两种ThreadPoolService\ForkJoinPool
    区别在于ThreadPoolService多个线程共享一个任务队列,下面各个每个线程都有自己的任务队列

    相关文章

      网友评论

          本文标题:java多线程与高并发(九)线程池与源码解读

          本文链接:https://www.haomeiwen.com/subject/qtdlrltx.html