美文网首页
Scheduler/Worker 原理与实现

Scheduler/Worker 原理与实现

作者: 莫库施勒 | 来源:发表于2019-07-10 17:41 被阅读0次

    Scheduler/Worker 需要满足以下的要求:

    1. 所有的方法都需要是线程安全的;
    2. Worker 需要保证即时、串行提交的任务按照先进先出(FIFO)的顺序被执行;
    3. Worker 需要尽可能保证被取消订阅时要取消还未执行的任务;
    4. Worker 被取消订阅时不能影响同一个 Scheduler 的其他 Worker;
    public final class ContextAwareScheduler extends Scheduler {
        public static final ContextAwareScheduler INSTANCE = 
                new ContextAwareScheduler();                       // (1)
        
        final NewThreadWorker worker;
        
        private ContextAwareScheduler() {
            this.worker = new NewThreadWorker(
                    new RxThreadFactory("ContextAwareScheduler")); // (2)
        }
        @Override
        public Worker createWorker() {
            return new ContextAwareWorker(worker);                 // (3)
        }
        
        static final class ContextAwareWorker extends Worker {
            final CompositeDisposable tracking;                  // (4)
            final NewThreadWorker worker;
    
            public ContextAwareWorker(NewThreadWorker worker) {
                this.worker = worker;
                this.tracking = new CompositeSubscription();
            }
            @Override
            public Subscription schedule(Action0 action) {
                // implement
            }
    
            @Override
            public void dispose() {
                tracking.dispose();      // (5)
            }
    
            @Override
            public boolean isDisposed() {
                return tracking.isDisposed();
            }
        }
    }
    

    上面是我们定义的一个 Scheduler:

    1. 由于scheduler 不能存在多个实例,因此我们使用了一个静态的单例。
    2. scheduler 会把几乎所有的工作都转交给内部的一个 worker
    3. 我们不能对外只提供一个 worker 实例,否则一旦 worker 被取消订阅,所有人的 worker 都被取消订阅了。因此我们需要单独为每个 worker 实例记录提交过来的任务。以便我们可以检查是否已经取消了订阅,以及进行取消订阅操作

    接下来,我们就需要前面提到的线程局部上下文了:

    public final class ContextManager {
        static final ThreadLocal<Object> ctx = new ThreadLocal<>();
        
        private ContextManager() {
            throw new IllegalStateException();
        }
        
        public static Object get() {
            return ctx.get();
        }
        public static void set(Object context) {
            ctx.set(context);
        }
    }
    

    ContextManager 仅仅是包装了一个静态的 ThreadLocal 变量。在实际情况中,你可能要把 Object 替换成你需要的类型。

    现在让我们继续看 schedule() 的实现:

    // ...
            @Override
            public Disposable schedule(final Runnable run, long delay, TimeUnit unit) {
                if (isDisposed()) {                         // (2)
                    return EmptyDisposable.INSTANCE;
                }
    
                final Object context = ContextManager.get();          // (3)
                Runnable a = new Runnable() {
                    @Override
                    public void run() {
                        ContextManager.set(context);                // (4)
                        run.run();
                    }
                };
    
                return worker.scheduleActual(a, delay, unit, tracking);
            }
    // ...
    

    让我们尝试一下:

    Worker w = INSTANCE.createWorker();
    CountDownLatch cdl = new CountDownLatch(1);
    
    ContextManager.set(1);
    w.schedule(() -> {
        System.out.println(Thread.currentThread());
        System.out.println(ContextManager.get());
    });
    
    ContextManager.set(2);
    w.schedule(() -> {
        System.out.println(Thread.currentThread());
        System.out.println(ContextManager.get());
        cdl.countDown();
    });
    
    cdl.await();
    
    ContextManager.set(3);
    
    Observable.timer(500, TimeUnit.MILLISECONDS, INSTANCE)
    .doOnNext(v -> {
        System.out.println(Thread.currentThread());
        System.out.println(ContextManager.get());
    }).blockingFirst();
    
    w.dispose();
    

    运行输出如下:

    Thread[ContextAwareScheduler1,5,main]
    1
    Thread[ContextAwareScheduler1,5,main]
    2
    Thread[ContextAwareScheduler1,5,main]
    3
    

    在引入了 Scheduler/Worker 之后我们需要记录这些任务,并取消它们。一旦 Future 被记录了,那就需要在它们完成或者被取消时取消记录,否则就会发生内存泄漏。这就意味着我们不能直接把一个 Action0/Runnable 提交到 ExecutorService 上,我们需要包装一下它们,让它们可以在完成或者被取消时被取消记录。解决方案就是 ScheduledAction 类。

    public final class ScheduledAction implements Runnable, Subscription {
        final Action0 action;                       // (1)
        final SubscriptionList slist;               // (2)
     
        public ScheduledAction(Action0 action) {
            this.action = action;
            this.slist = new SubscriptionList();
        }
        @Override
        public void run() {
            try {
                action.call();                      // (3)
            } finally {
                unsubscribe();                      // (4)
            }
        }
        @Override
        public boolean isUnsubscribed() {
            return slist.isUnsubscribed();
        }
        @Override
        public void unsubscribe() {
            slist.unsubscribe();
        }
         
        public void add(Subscription s) {           // (5)
            slist.add(s);
        }
    }
    

    接下来就是要在任务被提交到 ExecutorService 之前把所有的记录以及清理任务都串起来。为了简单起见,我们假设我们的 ExecutorService 是个单线程的 service。我们将在后面处理多线程的情况。

    // ...
    @Override
    public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
        if (isUnsubscribed()) {                                // (1)
            return Subscriptions.unsubscribed();
        }
        ScheduledAction sa = new ScheduledAction(action);      // (2) 
         
        tracking.add(sa);                                      // (3)
        sa.add(Subscriptions.create(
            () -> tracking.remove(sa)));
         
        Future<?> f;
        if (delayTime <= 0) {                                  // (4)
            f = exec.submit(sa);
        } else if (exec instanceof ScheduledExecutorService) { // (5)
            f = ((ScheduledExecutorService)exec)
                 .schedule(sa, delayTime, unit);
        } else {
            f = genericScheduler.schedule(() -> {              // (6)
                Future<?> g = exec.submit(sa);
                sa.add(Subscriptions.create(                   // (7)
                    () -> g.cancel(false)));
            }, delayTime, unit);
        }
         
        sa.add(Subscriptions.create(                           // (8)
            () -> f.cancel(false)));
     
        return sa;                                             // (9)
    }
    // ...
    

    代码工作机制如下:

    1. 如果 worker 已经被取消订阅,我们就返回一个表示已经取消的常量 subscription。注意,如果有 schedule 调用(由于多线程竞争)通过了这个检查,它将会收到一个来自底层线程池的 RejectedExecutionException。你可以把函数中后面的代码都用一个try-catch 包裹起来,并在异常发生时返回同样的表示已经取消的常量 subscription。
    2. 我们把任务包装为 ScheduledAction
    3. 在实际调度这个任务之前,我们把它加入到tracking 中,并且增加一个取消订阅的回调,以便在它执行完毕或者被取消时可以将其从 tracking中移除。注意,由于幂等性,remove() 不会调用 ScheduledActionunsubscribe(),从而不会导致死循环。
    4. 如果调度是没有延迟的,我们就立即将其提交,并且保存返回的 Future
    5. 如果我们的 ExecutorService 是 ScheduledExecutorService,我们就可以直接调用它的 schedule()函数了。
    6. 否则我们就需要借助 ScheduledExecutorService 来实现延迟调度了,但我们不能直接把任务调度给它,因为这样它会在错误的线程中执行。我们需要创建一个中间任务,它将在延迟结束之后,向正确的线程池调度一个即时的任务。
    7. 我们需要保证提交后返回的 Future 能在 unsubscribe() 调用时被取消。这里我们把内部的 Future 加入到了 ScheduledAction 中。
    8. 无论是立即调度,还是延迟调度,我们都需要在取消订阅时取消这个调度,所以我们把返回的 Future 加入到 ScheduledAction 中(通过把 Future#cancel() 包装到一个 Subscription 中)。在这里,你就可以控制是否需要强行(中断)取消了。(RxJava 会根据取消订阅时所处的线程来决定:如果取消订阅就是在执行任务的线程中,那就没必要中断了)
    9. ScheduledAction 也是任务发起方用来取消订阅的凭证(token)。

    最后缺失的一点代码就是 genericScheduler 了。你可以为 worker 添加一个static final 成员,并像下面这样设置:

    static final ScheduledExecutorService genericScheduler;
    static {
        genericScheduler = Executors.newScheduledThreadPool(1, r -> {
            Thread t = new Thread(r, "GenericScheduler");
            t.setDaemon(true);
            return t;
        });
    }
    

    Worker最重要的一个规则就是有序提交的非延迟任务要按序执行,但是 Executor 的线程是随机取走任务,而且是并发乱序执行的。

    解决办法就是使用我们以前介绍过的“队列漏”,并且对调度的任务进行一个中继操作.

    首先是类结构

    public final class ExecutorScheduler extends Scheduler {
        final Executor exec;
        public ExecutorScheduler(Executor exec) {
            this.exec = exec;
        }
        @Override
        public Worker createWorker() {
            return new ExecutorWorker();
        }
         
        final class ExecutorWorker extends Worker implements Runnable {                             // (1)
            // data fields here
            @Override
            public Subscription schedule( Action0 action) {
                // implement
            }
            @Override
            public Subscription schedule( Action0 action, long delayTime, TimeUnit unit) {
                // implement
            }
            @Override
            public void run() {
                // implement
            }
            @Override
            public boolean isUnsubscribed() {
                // implement
            }
            @Override
            public void unsubscribe() {
                // implement
            }
        }
    }
    

    接着是一些变量

    // 漏逻辑需要
    // 一个正在执行的标记、
    final AtomicInteger wip = new AtomicInteger();
    // 一个队列、
    final Queue<ScheduledAction> queue = new ConcurrentLinkedQueue<>();
    //以及一个 subscription 容器类型,以便集中取消订阅:
    final CompositeSubscription tracking = new CompositeSubscription();
    

    首先让我们看看无延迟的 schedule()

    @Override 
    public Subscription schedule(Action0 action) {
        if (isUnsubscribed()) {
            return Subscriptions.unsubscribed();
        }
        ScheduledAction sa = 
                new ScheduledAction(action);
        tracking.add(sa);
        sa.add(Subscriptions.create(
                () -> tracking.remove(sa)));        // (1)
            
        queue.offer(sa);                            // (2)
             
        sa.add(Subscriptions.create(
                () -> queue.remove(sa)));           // (3)
             
        if (wip.getAndIncrement() == 0) {           // (4)
            exec.execute(this);                     // (5)
        }
             
        return sa;
    }
    

    主要逻辑是:

    1. 创建一个 ScheduledAction,并且在其被取消订阅时将自己从 tracking 中移除。
    2. 把任务加入到队列中,队列会保证任务按照提交顺序先进先出(FIFO)执行。
    3. 在任务被取消时,将任务从队列中移除。注意这里的移除操作复杂度为 O(n),n 表示队列中在该任务之前等待执行的任务数。
    4. 我们只允许一个漏线程,也就是把 wip 从 0 增加到 1 的线程。
    5. 如果当前线程赢得了漏权利,那我们就把 worker 自己提交到 Executor 上,并在 run() 函数中从队列中取出任务执行。注意,这里的this

    接下来是 ExecutorWorkerrun() 方法

    @Override 
    public void run() {
        do {
            if (isUnsubscribed()) {                   // (1)
                queue.clear();
                return;
            }
            ScheduledAction sa = queue.poll();        // (2)
            if (sa != null && !sa.isUnsubscribed()) {
                sa.run();                             // (3)
            }
        } while (wip.decrementAndGet() > 0);          // (4)
    }
    

    漏的逻辑比较直观:

    1. 我们先检查 worker 是否已经被取消请阅,如果已经取消,那我们就清空队列,并且返回。
    2. 我们从队列中取出一个任务。
    3. 由于在 run() 函数执行期间可能会删除任务,或者由于取消订阅而清空队列,所以我们需要检查是否为 null,以及是否已经被取消订阅。如果都没有,那我们就执行这个任务。
    4. 我们递减 wip,直到为 0 就退出循环,此时我们就可以安全地重新调度这个 worker,并在 Executor 上执行漏任务了。

    最后,最复杂的就是延迟调度的实现了 schedule()

    @Override
    public Subscription schedule(
            Action0 action, 
            long delayTime,
            TimeUnit unit) {
     
        if (delayTime <= 0) {
            return schedule(action);                      // (1)
        }
        if (isUnsubscribed()) {
            return Subscriptions.unsubscribed();          // (2)
        }
         
        ScheduledAction sa = 
                new ScheduledAction(action);
        tracking.add(sa);
        sa.add(Subscriptions.create(
                () -> tracking.remove(sa)));              // (3)
         
        ScheduledExecutorService schedex;
        if (exec instanceof ScheduledExecutorService) {
            schedex = (ScheduledExecutorService) exec;    // (4)
        } else {
            schedex = CustomWorker.genericScheduler;      // (5)
        }
         
        Future<?> f = schedex.schedule(() -> {            // (6)
             
            queue.offer(sa);                              // (7)
             
            sa.add(Subscriptions.create(
                    () -> queue.remove(sa)));
             
            if (wip.getAndIncrement() == 0) {
                exec.execute(this);
            }
             
        }, delayTime, unit);
         
        sa.add(Subscriptions.create(
                () -> f.cancel(false)));                  // (8)
         
        return sa;
    }
    

    我们可以看到这里取的顺序只与插入的顺序相关,而与延迟的时间无关

    相关文章

      网友评论

          本文标题:Scheduler/Worker 原理与实现

          本文链接:https://www.haomeiwen.com/subject/jlckkctx.html