美文网首页线程池
ScheduledThreadPoolExecutor源码分析

ScheduledThreadPoolExecutor源码分析

作者: 念䋛 | 来源:发表于2020-12-29 09:35 被阅读0次

    ScheduledThreadPoolExecutor定时线程池
    ScheduledThreadPoolExecutor底层也是线程池,只是队列使用的是DelayedWorkQueue队列
    并且最大线程数为整型最大数

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    

    先分析一下DelayedWorkQueue队列

    DelayedWorkQueue队列底层使用的是堆数据结构,保证堆顶是最小的数,堆顶就是最先要执行任务,这里的任务为RunnableScheduledFuture,获取任务的时候,直接获取下角标为0的任务即可.

    l 堆都是满二叉树.因为满二叉树会充分利用数组的内存空间;

    l 最小堆是指父节点比左节点和右节点都小的结构,所以整个最小堆中,根节点是最小的节点;

    l 最大堆是指父节点比左节点和右节点都大的结构,所以整个最大堆中,根节点是最大的节点;

    l 最大堆和最小堆的左节点和右节点没有关系,只能判断父节点和左右两节点的大小关系;

    基于堆的这些属性,堆适用于找到集合中的最大或者最小值;另外,堆结构记录任务及其索引的关系,便于插入数据或者删除数据后重新排序,所以堆适用于优先队列。可以了解一下这个网站

    https://www.cs.usfca.edu/~galles/visualization/Heap.html

    image.png
    上面一行是数组,数据结构下标为:堆顶为0 ,左子节点为1,右节点为2,一次往下排
    那每一个节点的父节点和左右子节点的怎么算呢
    004举例,004的索引为1
    父节点002的索引为 (i -1)/2 =0
    左子节点008的索引为 2i +1 =3
    右子节点 007的索引为 2
    i +2 =4

    添加元素


    image.png

    添加003到堆的末尾,先左后右,进行比较,判断003和008大小,如果小则交换位置


    image.png
    再判断与004大小,小于交换位置
    image.png

    在判断与002大小,大于则位置不动,插入结束
    删除元素


    image.png
    002从数组中移除,
    image.png
    堆尾008到堆顶,与子节点比较大小
    image.png
    先比较左节点,008大于003 交换,008再比较与004大小
    image.png
    删除元素结束

    DelayedWorkQueue队列的源码
    成员属性

    //数组的初始大小16
    private static final int INITIAL_CAPACITY = 16;
    //锁
    private final ReentrantLock lock = new ReentrantLock();
    //数组
    private RunnableScheduledFuture<?>[] queue =  new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    //队列大小
    private int size = 0;
    //执行线程,线程池中的线程
    private Thread leader = null;
    //条件等待队列,在阻塞队列中分析过
    private final Condition available = lock.newCondition();
    

    Offer添加元素

    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
    //将任务强转为RunnableScheduledFuture
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
    //添加的是使用锁,保证线程安全
        lock.lock();
        try {
            int i = size;
    //如果超过线程的最大数,则扩容,没有扩容因子,扩容1.5倍
            if (i >= queue.length)
                grow();
    //数组加一
            size = i + 1;
    //如果数组为空,将任务直接放入队头
            if (i == 0) {
                queue[0] = e;
    //设置索引,赋值heapIndex属性的值
                setIndex(e, 0);
            } else {
    //如果数组不为空,根据上面分析的堆特性,将任务放入到队列
                siftUp(i, e);
            }
    //数组为空的情况下,添加任务到队头,将leader赋值为null,并唤醒等待队列firstWaiter
    线程
            if (queue[0] == e) {
                leader = null;
    //唤醒等待队列的firstWaiter线程
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
    

    扩容

    private void grow() {
        int oldCapacity = queue.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1.5倍扩容
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;
        queue = Arrays.copyOf(queue, newCapacity);
    }
    
    

    siftUp 插入元素,首先插入队尾,与父节点比较,小于则交换,直到索引值k<=0 跳出,因为索引为0则值第一个元素,负数则没有意义

    private void siftUp(int k, RunnableScheduledFuture<?> key) {
        while (k > 0) {
    //获取父节点索引 >>>1 除2
            int parent = (k - 1) >>> 1;
    //获取父节点
            RunnableScheduledFuture<?> e = queue[parent];
    //比较大小,大于则退出,小于则交换
            if (key.compareTo(e) >= 0)
                break;
    //如果小于交换
            queue[k] = e;
            setIndex(e, k);
            k = parent;
        }
        queue[k] = key;
        setIndex(key, k);
    }
    
    

    SiftDown方法,从上而下将元素插入

    
    private void siftDown(int k, RunnableScheduledFuture<?> key) {
    //一半是因为父节点和叶子节点比较的时候,是先获取叶子节点小的,在和父节点比较
        int half = size >>> 1;
        while (k < half) {
    //获得左侧叶子几点
            int child = (k << 1) + 1;
            RunnableScheduledFuture<?> c = queue[child];
    //获取右侧叶子节点
            int right = child + 1;
    //左右叶子节点比较大小,并将小的赋值给c
            if (right < size && c.compareTo(queue[right]) > 0)
                c = queue[child = right];
    //key小于c的话,退出循环,因为是小堆,父节点要小于子节点
            if (key.compareTo(c) <= 0)
                break;
    //否则交换
            queue[k] = c;
            setIndex(c, k);
            k = child;
        }
        queue[k] = key;
        setIndex(key, k);
    }
    

    下面看元素的移除,涉及到线程的等待和唤醒
    Poll方法

    public RunnableScheduledFuture<?> poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
    //获取堆顶,就是最小的元素,因为线程要执行的就是延时时间最小的节点,
            RunnableScheduledFuture<?> first = queue[0];
    //获取队列为空,或者时间还没到,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
    //first不为空,并且延时时间已经到了,则获取队列的第一个任务
                return finishPoll(first);
        } finally {
            lock.unlock();
        }
    }
    

    上面分析过如何取元素,将队头元素取出之后,将队尾的元素放到队头,自上而下的排序

    private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    //队尾的元素索引,并将队列的长度减一
        int s = --size;
    //获取队尾任务
        RunnableScheduledFuture<?> x = queue[s];
        queue[s] = null;
    //放到对头 自上而下排序
        if (s != 0)
            siftDown(0, x);
        setIndex(f, -1);
       
    //返回对头任务
     return f;
    }
    

    take方法

    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
    //获取队头
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
    //如果null则将当先线程添加入等待队列
                    available.await();
                else {
    //如果不为null,则获取任务距离现在的时间差,单位为纳秒
                    long delay = first.getDelay(NANOSECONDS);
    //如果<0则返回队头的任务
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
    //leader是执行任务的线程,如果不为空,则leader执行任务,并将当前线程存放到等待队列
                    if (leader != null)
                        available.await();
                    else {
    //如果为空,将当前线程赋值给leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
    //让leader等待delay时间,等待任务的执行,当时间到了之后,继续执行for循环
                            available.awaitNanos(delay);
                        } finally {
    //如果leader为当前线程,当前线程执行完任务后,将leader赋值为null
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
    // 这段代码一定会执行,如果经过上面的判断,如果leader如果还为null 则唤醒条件等待队列的线程
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
    
    image.png

    第二次循环 delay<0 比较理想的情况,如果1093行代码, available.awaitNanos(delay);在等待的时候,可能被唤醒,或者中断,那第二次for循环delay可能还大于0
    poll(long timeout, TimeUnit unit)方法就不分析了,和take方法基本一致,就是在线程的过期时间和任务距离现在的等待时间的一些判断
    DelayedWorkQueue队列就介绍到这

    ScheduledThreadPoolExecutor有4个主要的方法,
     public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit)
     public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit)
     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit)
     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

    DelayedWorkQueue队列中只存放ScheduledFutureTask

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
    //等待时间
        this.time = ns;
    //周期时间
        this.period = period;
    //在堆数据结构在排序的时候,也就是执行compareTo 比较等待时间大小的时候,如果时间等,
    //就按照sequenceNumber的大小排序
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    

    Schedule方法的两个形参一个是Runnable 一个是Callable都是一样的,只是Callable有返回参数,
    schedule方法

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
    //只返回了new ScheduledFutureTask<V>(callable,triggerTime(delay, unit))
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
    //执行任务
        delayedExecute(t);
        return t;
    }
    

    delayedExecute方法

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
    //添加任务到队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
    //执行任务
                ensurePrestart();
        }
    }
    
    

    ensurePrestart主要就是调用了addWorker方法

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
    

    在线程池章节中分析过主要就是ThreadPoolExecutor的getTask()方法的,从队列中获取任务就是上面分析过的take方法和poll(long timeout, TimeUnit unit)获取任务,返回null则线程执行结束,返回任务则执行任务.

    scheduleAtFixedRate和scheduleWithFixedDelay的两个方法的区别
    scheduleAtFixedRate如果周期是2秒,但是执行任务是3秒,那两个任务的执行时间为3秒,
    scheduleWithFixedDelay如果周期是2秒,但是执行任务是3秒,那两个任务的执行时间为5秒
    在调用上面两个方法的时候,都new ScheduledFutureTask,区别在最后一个参数
    scheduleAtFixedRate为unit.toNanos(period)
    scheduleWithFixedDelay为unit.toNanos(-delay)

    当执行到ScheduledFutureTask重写的run方法时候,调用了下面的方法

    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    }
    

    time为下一次任务执行的时间, 在take的first.getDelay(NANOSECONDS);中time-当前时间

    可以看到如果p>0,time+=p, first.getDelay(NANOSECONDS);得到的时间不一定为周期时间
    举一个例子 这里就不用时间戳了正常time为时间戳
    00:00:00秒开始执行 延时0秒(立即执行) 任务执行需要3秒 周期2秒
    time 00:00:00
    p为2
    第一次执行getDelayde 因为延时0秒,返回0,执行任务,但是需要3秒, 00:00:03秒执行结束,但是周期为两秒,下一次执行时间为 00:00:02,此时的时间已经00:00:03,所以getDelayde为 -1 小于0 立即执行,00:00:06执行结束,周期两秒,因为累加所以下一次执行时间为00:00:04
    getDelayde就为-2了,如果任务执行的时间一直为3秒,那time和当前时间差的就越来越大

    如果p<0的话,用now+p,保证first.getDelay(NANOSECONDS);的结果为周期时间
    通过上面的例子,可以推出, p<0的时候,不管任务执行多久,两次的任务间隔都为2秒

    ScheduledFutureTask重写的run方法

    public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
    //如果不是周期任务
            else if (!periodic)
                ScheduledFutureTask.super.run();
    //如果是周期任务
            else if (ScheduledFutureTask.super.runAndReset()) {
    //下一个任务执行的时间,重新计算
                setNextRunTime();
    //将任务重新执行, outerTask为当前的任务, 调用scheduleAtFixedRate和scheduleWithFixedDelay方法的时候, 调用的这段代码sft.outerTask = t;保证了任务的周期进行
                reExecutePeriodic(outerTask);
            }
        }
    }
    

    定时任务线程池,分析结束.多线程没有办法分析出所有的情况,我也只是分析出大体的流程,做个笔记,方便记忆,也希望能给大家一些帮助.

    相关文章

      网友评论

        本文标题:ScheduledThreadPoolExecutor源码分析

        本文链接:https://www.haomeiwen.com/subject/sjcxoktx.html