美文网首页
Java-常见的线程池

Java-常见的线程池

作者: zzq_nene | 来源:发表于2021-02-02 23:27 被阅读0次

    一、常见的线程池

    FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor
    这些常见的线程池,基本都是通过Executors中对应的new方法进行创建的。

    1.FixedThreadPool

    核心线程数固定,没有非核心线程,LinkedBlockingQueue 无界的Queue

        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    

    2.CachedThreadPool

    只有非核心线程,60s的空闲线程保活,SynchronousQueue,直接提交给线程运行。OkHttp中的线程池就是用的CachedThreadPool,核心线程数是0,最大线程数是Integer.MAX_VALUE,使用SynchronousQueue队列,SynchronousQueue队列是0容量的阻塞队列,那么就会直接交给线程执行

        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    3.ScheduledThreadPool

    有延迟执行和周期重复执行的线程池,new DelayedQueue ()
    DelayedQueue : 优先队列 PriorityQueue 存储元素

        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    
        // ScheduledThreadPoolExecutor.java
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    

    ScheduledThreadPoolExecutor继承自ThreadPoolExecutor

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    ScheduledThreadPool实现周期性原理
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0L)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              -unit.toNanos(delay),
                                              sequencer.getAndIncrement());
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            // 将任务添加到DelayWorkQueue队列中
            delayedExecute(t);
            return t;
        }
    

    获取DelayWorkQueue队列中的任务

            public RunnableScheduledFuture<?> take() throws InterruptedException {
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture<?> first = queue[0];
                        if (first == null)
                            available.await();
                        else {
                            long delay = first.getDelay(NANOSECONDS);
                            if (delay <= 0L)
                                return finishPoll(first);
                            first = null; // don't retain ref while waiting
                            if (leader != null)
                                available.await();
                            else {
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    available.awaitNanos(delay);
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        available.signal();
                    lock.unlock();
                }
            }
    

    执行任务

            public void run() {
                boolean periodic = isPeriodic();
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    super.run();
                else if (super.runAndReset()) {
                    // 判断是否需要周期性执行,如果是
                    // 则重置下一次要执行的时间
                    setNextRunTime();
                    reExecutePeriodic(outerTask);
                }
            }
    
            private void setNextRunTime() {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
            }
    

    放回到队列中

            public boolean offer(Runnable x) {
                if (x == null)
                    throw new NullPointerException();
                RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    int i = size;
                    if (i >= queue.length)
                        grow();
                    size = i + 1;
                    if (i == 0) {
                        queue[0] = e;
                        setIndex(e, 0);
                    } else {
                        siftUp(i, e);
                    }
                    if (queue[0] == e) {
                        leader = null;
                        available.signal();
                    }
                } finally {
                    lock.unlock();
                }
                return true;
            }
    

    主要就是通过修改ScheduledFutureTask的time值,而time是一个volatile修饰,time的值的修改则是通过优先级period的值来判断对time加上还是减去period值

    4.SingleThreadExecutor

    LinkedBlockingQueue
    一个核心线程,不需要处理同步

        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    

    二、饱和策略

    RejectedExecutionHandler 饱和策略
    (1)DiscardOldestPolicy:直接丢弃最老的那个任务,执行当前任务。如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) 意思就是在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
    (2)AbortPolicy:直接抛出异常,默认的使用,即拒绝执行任务RejectedExecutionException。
    (3)CallerRunsPolicy:让调用者线程去执行任务,即谁往线程池中提交任务,就由谁来执行这个任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
    (4)DiscardPolicy:把最新提交的任务丢弃
    如果这这四种拒绝策略都不满足,则自己实现RejectedExecutionHandler接口,自己定义一个拒绝策略。

    相关文章

      网友评论

          本文标题:Java-常见的线程池

          本文链接:https://www.haomeiwen.com/subject/lbaytltx.html