美文网首页
线程池原理

线程池原理

作者: 孙大硕 | 来源:发表于2019-10-25 14:36 被阅读0次

    首先在使用线程池的时候有今个问题:
    Q1:什么是多线程?
    Q2:为什么要用到线程池?
    Q3:线程池那么多参数都有什么用?
    Q4:当我们提交一个任务的时候到底发生了什么?
    Q5:线程池里的任务是怎么执行的?
    Q6:阻塞队列到底有什么用?

    什么是多线程,为什么要用到多线程

    当任务不是CPU密集型操作时,比如是网络、数据库等IO密集型操作,单个线程会阻塞很长时间导致CPU等待时间的浪费,在多线程模型下CPU可以在这段时间进行切换去做其他的任务,可以最大程度发挥CPU性能,在多核的设备下这种效果更明显。

    当然在Android中是不允许操作阻塞主线程的,这也就强制开发者要用多线程技术。

    为什么要用到线程池?

    线程的时间包括创建时间+运行时间+销毁时间
    而线程池有什么作用的,线程池顾名思义就是放线程的池子,里面放了一条条线程,当有任务来的时候就去池子里拿一条现成的线程直接执行任务,这就节省了创建和销毁时间。

    线程池的参数

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • corePoolSize 核心线程数,一旦开启就存活的线程,可以设置超时参数
    • maximumPoolSize 最大线程数,核心线程数+非核心线程数
    • keepAliveTime 线程空闲时间,超过这个时间线程被回收
    • unit 时间单位
    • workQueue 保存任务的队列
    • threadFactory 线程工厂
    • handler 线程被拒绝的处理者

    创建线程池,参考AsyncTask中的线程池:

    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
    

    这里用到了默认的handler:

     public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
    

    所以在拒绝任务的时候会抛异常,所以当线程池不够大时要注意这个参数。

    通常结合具体的使用场景corePoolSize,maximumPoolSize,workQueue会使用不同的参数,下面先介绍任务被加入的过程,才能更好理解这些参数。

    任务被加入的过程

    有几个重要的变量

    1. ctl 是一个32位的整数,高三位存储线程的状态,低29位表示工作线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    private static final int COUNT_BITS = Integer.SIZE - 3;
    CAPACITY 高三位为0,后29位为1
    

    怎么计算线程池的状态:

     private static int runStateOf(int c)     { return c & ~CAPACITY; }
    返回的是c的高3位
    

    计算工作线程数量也一样

    private static int workerCountOf(int c)  { return c & CAPACITY; }
    返回c的低29位
    

    线程池的状态


    线程池的状态
    • Running :就是正在运行,线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
    • SHUTDOWN:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务,调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
    • STOP:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP
    • TIDYING:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
    • TERMINATED:线程池彻底终止,就变成TERMINATED状态

    任务加入的过程

    任务加入过程

    提交任务有两种方式,第一种方式是

     public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    

    当执行了execute(Runnable r)方法之后,先判断当前线程的数量是否小于核心线程数,如果是就直接新建一个线程去执行,如果核心线程已满或者上一步新建出错,就将任务加入任务队列,如果加入队列成功就再次检查当前的线程池状态,因为加入队列的时候线程池的状态可能改变了,如果不在运行就移除任务执行拒绝策略,如果没有工作线程就开启一个线程;如果加入队列不成功,说明队列已满,直接开启新线程去执行任务,如果再开启不了新线程,就说明线程池关闭了或者线程满了,执行拒绝策略。

    过程很复杂,总的来说就是有任务先判断核心线程是否已满,如果没有就放到核心线程,如果满了就添加进队列,如果队列满了就创建非核心线程,如果都满了就拒绝,当有线程空闲下来,就去队列里面取任务去执行。

    线程池为什么要设计一个阻塞队列,而且只有队列满了才开启非核心线程呢?
    主要是因为我们很多时候不希望一有任务来就开启非核心线程,而是先让任务排队,等对拍满了,才会迫不得已开启非核心线程,这样能最大程度节约资源。

    线程池中的线程是怎么运行的

    创建线程用到了

    private boolean addWorker(Runnable firstTask, boolean core)
    

    首先判断运行状态,然后更新工作线程数
    然后创建一个Worker,执行真正的任务

     Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
    

    那么加入任务队列的任务怎么执行的

    在Worker真正执行的时候会一直在这里循环

    while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
    

    当线程执行完一个任务就将task==null
    在getTask中会去workQueue取任务,直到线程被中断抛出异常或者没有任务执行

    try {
    
                  //  如果可以超时回收的话就调用workQueue的阻塞方法,在超时之后会返回null,worker的循环结束,然后被回收。
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
    

    在线程池中有个很重要的参数就是阻塞队列,以LinkedBlockingQueue为例

    提供了三个阻塞方法put,take,poll(long timeout, TimeUnit unit)

    其实阻塞队列就是为生产者消费者模型服务的,当队列不为满的时候唤醒生产者,当队列不为空的时候唤醒消费者,所以利用阻塞队列可以很容易的写出一个生产者消费者模型。

    在线程池主要是用到了后两个方法,为了超时机制,当我们有需要去写一个超时机制的时候可以借鉴一下阻塞队列。

    怎么用线程池?

    Java提供了几中常用的线程池
    FixedThreadPool : 固定个数的线程池,阻塞队列大小无限,所以用不着非核心线程
    CachedThreadPool:只有非核心的线程池,SynchronousQueue是一个空队列,只有当生产者和消费者同时放和取时才起作用,缺少任意一个就会阻塞,就是一旦有线程空下来去队列里取任务的时候,该队列会把到来的任务马上交给该线程,这样做的目的就是可以马上执行任务而且可以充分利用空闲线程
    ScheduledThreadPool:核心线程数调用者指定,执行定时任务的线程池,DelayedWorkQueue无限大,非核心线程无效
    SingleThreadPool:单个线程的线程池,保证任务顺序执行

    关于核心线程数最大线程数以及阻塞队列的大小该怎么设置,有名人说当是CPU密集型任务就是计算较多的情况下线程数应该尽量少一点,如果是IO密集型线程数应该大一点,对客户端来说大部分都是IO密集型,所以线程数大一点没关系,推荐使用上面的cachedThreasPool,OKHttp就是用的这个线程池。

    相关文章

      网友评论

          本文标题:线程池原理

          本文链接:https://www.haomeiwen.com/subject/vgrxvctx.html