美文网首页
ScheduledThreadPoolExecutor遇到的坑

ScheduledThreadPoolExecutor遇到的坑

作者: 麻吕 | 来源:发表于2018-07-18 16:35 被阅读0次

    线上表象

    在项目中,有一块代码实现了这样的需求:使用ScheduledThreadPoolExecutor定时去拉取新增或修改的配置信息,然后加入到本地HashMap,并且往记录表中增加一条记录。伪代码如下:

    private static Map<String, String> cache = new HashMap<>();
        public static void main(String[] args) {
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            scheduledExecutorService.scheduleAtFixedRate(() -> loadConfig(), 10, 10, TimeUnit.SECONDS);
        }
        private static void loadConfig() {
            //1.查询增量数据
            //2.更新map
            //3.往DB插入更新成功记录
        }
    

    但是有一条数据不在内存中(通过程序访问),而且定时任务也不再执行。

    解决思路

    第一反应就是查看线程状态:

     "pool-3-thread-1" #109 prio=5 os_prio=0 tid=0x00007f28bc024800 nid=0x4655 waiting on condition [0x00007f28d84f7000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000006c6750eb0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    

    纳尼,线程处于等待唤醒状态,相应的源代码如下:



    为什么会在这里等待呢?这明显是线程池队列中没有任务了嘛,具体原因这得从ScheduledThreadPoolExecutor源码说起

    ScheduledThreadPoolExecutor

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) {
            //数据校验
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            //将Runnable包装成ScheduledFutureTask
            ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit),unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            //延迟执行
            delayedExecute(t);
            return t;
        }
    

    在上述代码中,将上游传过来的Runnable封装成ScheduledFutureTask,ScheduledFutureTask继承自FutureTask,线程池调度执行时,执行的即是ScheduledFutureTask中的run方法。下面来看delayedExecute方法:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
            if (isShutdown())
                reject(task);
            else {
                //往队列中添加任务
                super.getQueue().add(task);
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    //预启动线程池中的一个Worker
                    ensurePrestart();
            }
        }
    

    线程启动后,由ScheduledThreadPoolExecutor的父类ThreadPoolExecutor接管。

    ThreadPoolExecutor

    在上一步的ensurePrestart启动线程池中的Worker后,即通过Worker的run方法执行runWorker:

    final void runWorker(Worker w) {
            //...
            try {
                while (task != null || (task = getTask()) != null) {
                    //...
                    if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&\!wt.isInterrupted())wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        //...
                    } finally {
                       //...
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    根据之前导出的线程状态,就是从此处的getTask()开始。在消费任务的时候等待,那么肯定是生产不足导致,下面我们继续寻找生产任务的地方。
    当有任务时,会执行任务的run方法,在这里,即是ScheduledFutureTask的run方法:

    public void run() {
                //是否周期调度
                boolean periodic = isPeriodic();
                //是否可运行
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                //运行,并设置下次任务
                else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();
                    reExecutePeriodic(outerTask);
                }
            }
    

    runAndReset代码如下:

    protected boolean runAndReset() {
            if (state != NEW || !UNSAFE.compareAndSwapObject(this,runnerOffset,null, Thread.currentThread()))
                return false;
            boolean ran = false;
            int s = state;
            try {
                Callable<V> c = callable;
                if (c != null && s == NEW) {
                    try {
                        c.call(); // don't set result
                        ran = true;
                    } catch (Throwable ex) {
                        setException(ex);
                    }
                }
            } finally {
                runner = null;
                s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
            return ran && s == NEW;
        }
    

    reExecutePeriodic代码如下:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
            if (canRunInCurrentRunState(true)) {
                super.getQueue().add(task);
                if (!canRunInCurrentRunState(true) && remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }
    

    在周期调度时,首先先执行runAndReset执行最初设置的Runnable代码,若代码成功执行,则返回true,否则返回false(runnable中的代码拋出异常)。而只有当返回true时,执行reExecutePeriodic代码,在reExecutePeriodic代码中,super.getQueue().add(task)即是任务的生产。而当runAndReset返回false时,则不再往Queue中添加task,此时再执行getTask()中的take()时,由于队列为空,所以线程变为等待唤醒,但已没有任何地方再添加task,所以不再定时调度。

    结论

    通过查看scheduleAtFixedRate的jdk文档,有一句如下:

    If any execution of the task encounters an exception, subsequent executions are suppressed.
    如果在任务的执行中遇到异常,后续执行被取消。

    不禁有个疑问,为什么要这么设计呢?这次任务失败并不一定意味着下一次会失败啊。
    个人是这样的看法:java不想去猜你的异常会怎样,要求你必须把自己的异常处理好,这样让自己更专注的做调度,而无须关心调度的逻辑。

    ScheduledThreadPoolExecutor的最优实践:
    将所有执行代码用try-cache包裹,如下:

    private static Map<String, String> cache = new HashMap<>();
        public static void main(String[] args) {
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            scheduledExecutorService.scheduleAtFixedRate(() -> loadConfig(), 10, 10, TimeUnit.SECONDS);
        }
        private static void loadConfig() {
            try {
                //1.查询增量数据
                //2.更新map
                //3.往DB插入更新成功记录
            } catch (Exception e) {
                //异常处理
            }
        }
    

    相关文章

      网友评论

          本文标题:ScheduledThreadPoolExecutor遇到的坑

          本文链接:https://www.haomeiwen.com/subject/nypopftx.html