美文网首页
2020-02-01-Java线程池

2020-02-01-Java线程池

作者: 耿望 | 来源:发表于2020-02-01 17:22 被阅读0次

Java线程池基本用法

Java提供了一些通用接口来创建线程池:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

但是通常不推荐使用这些简易接口,因为这些接口可能会使用无界的任务队列,理论上可以无限添加任务到线程池,并且对核心线程数和最大线程数的设置也可能不合理,导致对系统资源的消耗很大。
通常建议自己创建一个线程池对象,以ThreadPoolExecutor为例,看看如何构造一个线程池

线程池的创建

构造函数有几个参数比较重要:

  1. corePoolSize 核心线程数,线程池一般情况下不会回收的线程数量;
  2. maximumPoolSize 最大线程数,超过这个数量后不允许再创建线程;
  3. keepAliveTime 非核心线程处于空闲状态的最大时间,超过这个时间就会被回收;
  4. unit 时间单位,跟keepAliveTime结合使用;
  5. workQueue 任务队列,通常有SynchronousQueue,LinkedBlockingQueue几种类型;
  6. threadFactory 指定创建线程的工厂方法;
  7. handler 拒绝策略,当任务队列已满或线程数达到最大时执行。
/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池的生命周期

以ThreadPoolExecutor为例,简单看下代码结构和生命周期


Java线程池 (1).png

线程池有5个状态:

  1. RUNNING : 接受新的任务并处理队列中的任务;
  2. SHUTDOWN : 不再接受新任务,但会继续处理队列中未完成任务;
  3. STOP : 不再接受新任务,也不再处理队列中的任务,正在执行的任务被中断;
  4. TIDYING : 所有的任务处理完成,有效的线程数是0,下一步会执行terminated()进入TERMINATED;
  5. TERMINATED : terminated()方法执行完毕,线程池结束。

新任务的执行 execute()

一个新任务的执行分4步:

  1. 如果当前线程数小于核心线程数corePoolSize,就创建新的线程;
  2. 当前线程数大于核心线程数,且任务队列未满,将任务放入任务队列;
  3. 当前任务队列已满,且线程数小于最大线程数,就创建线程;
  4. 当前线程数大于最大线程数maximumPoolSize,拒绝执行任务。
/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

拒绝策略

Java提供了4种拒绝任务的策略,都是RejectedExecutionHandler的实现。

  1. AbortPolicy:抛出RejectedExecutionException;
  2. DiscardPolicy:什么也不做,直接忽略;
  3. DiscardOldestPolicy:丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置;
  4. CallerRunsPolicy:直接由提交任务者执行这个任务

任务队列

比较常见的队列类型有SynchronousQueue和LinkedBlockingQueue。

  1. SynchronousQueue
    这种队列没有缓存,生产者和消费者两个线程必须交叉运行。队列中已经有一个任务时,生产者线程阻塞,直到消费者取出线程,才能继续运行。反过来,队列为空时,消费者线程阻塞。
    比如下面两个线程会形成死锁,都进入WAITING状态。
private Thread putThread = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                queue.put("arg0");
                System.out.println("put arg0 to queue");
                takThread.join();
                queue.put("arg1");
                System.out.println("put arg1 to queue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "putThread");
    
    private Thread takThread = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                String arg0 = queue.take();
                System.out.println("take arg from queue:" + arg0);
                putThread.join();
                String arg1 = queue.take();
                System.out.println("take arg from queue:" + arg1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "takThread");

SynchronousQueue有两种公平和非公平模式,实际上是两种FIFO和LIFO数据结构。
公平模式使用队列,先入队的任务先执行;
非公平模式使用栈,后入队的任务先执行。
比如下面的代码,默认情况下是先执行putThread2,再执行putThread1。因为默认fair参数是false。

private Thread putThread1 = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                queue.put("putThread1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "putThread1");
    
    private Thread putThread2 = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                queue.put("putThread2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "putThread2");
    
    private Thread takThread = new Thread(new Runnable() {
        
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("take arg from queue:" + queue.take());
                System.out.println("take arg from queue:" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "takThread");
  1. LinkedBlockingQueue
    LinkedBlockingQueue也是生产者-消费者模式,但是生产者线程和消费者线程使用不同的锁来进行同步,理论上生产者可以一直添加任务进队列。
    LinkedBlockingQueue是FIFO的队列,相当于公平模式,保证先入队的任务先执行。
    LinkedBlockingQueue可以指定容量,当队列已满时,再执行put()方法,生产者线程会阻塞,进入WAITING状态。或执行offer()方法会返回false。
    参考:

https://www.cnblogs.com/ants/p/11343657.html
https://blog.csdn.net/yanyan19880509/article/details/52562039

相关文章

  • 2020-02-01-Java线程池

    Java线程池基本用法 Java提供了一些通用接口来创建线程池: 但是通常不推荐使用这些简易接口,因为这些接口可能...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

网友评论

      本文标题:2020-02-01-Java线程池

      本文链接:https://www.haomeiwen.com/subject/ddqxxhtx.html