Rxjava入门之Scheduler & Worker

作者: lluo2010 | 来源:发表于2016-10-06 18:56 被阅读0次

自己感觉这块自己整理的不好,主要是没花太多时间,不敢兴趣的可以略过第四部分的"各种Scheduler及对应的Worker".

在Rxjava中,为了使用异步,我们经常看到如下的代码:

    xxx.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation())

Schedulers.io()和Schedulers.computation()返回的都是Scheduler. Scheduler是负责执行任务的单元,而Schedulers是创建各种Scheduler的工厂.

Scheduler种类

RxJava中有下面几种Scheduler:

调度器类型 说明
Schedulers.computation() 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.newThread() 为每个任务创建一个新线程
Schedulers.immediate() 在当前线程立即开始执行任务
Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行

*** Schedulers.immediate和Schedulers.trampoline都在当前线程执行,区别是前者马上执行,后者是等队列中的其他任务完成后才执行. ***

最明显的例子是,当你要执行一个任务,而任务中有要执行另一个任务时, 使用这两种方式就能明显看出区别.

下面的例子比较容易看出区别.

看下面代码任务outerAction中又启一个innerAction任务:

    static void testScheduler(Scheduler scheduler){
        Worker worker = scheduler.createWorker();
        Action0 leafAction = () -> System.out.println("----leafAction.");
        Action0 innerAction = () ->
        {
            System.out.println("--innerAction start.");
            worker.schedule(leafAction);
            System.out.println("--innerAction end.");
        };
        
        
        Action0 outerAction = () ->
        {
            System.out.println("outer start.");
            worker.schedule(innerAction);
            System.out.println("outer end.");
        };
        
        worker.schedule(outerAction);
    }

使用Schedulers.immediate():

testScheduler(Schedulers.immediate());

输出:

outer start.
--innerAction start.
----leafAction.
--innerAction end.
outer end.

使用Schedulers.trampoline():

testScheduler(Schedulers.trampoline());

输出:

outer start.
outer end.
--innerAction start.
--innerAction end.
----leafAction.

Scheduler

A Scheduler is an object that schedules units of work. You can find common implementations of this class in Schedulers.

Scheduler是负责执行任务的工作单元.

它包含一个接口createWorker,负责创建Workder,以及一个包含一个静态的抽象类Worker.

代码如下:

public abstract class Scheduler {

    /**
     * Retrieves or creates a new  Scheduler.Worker} that represents serial execution of actions.
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     * 
     * @return a Worker representing a serial queue of actions to be executed
     */
    public abstract Worker createWorker();
    public abstract static class Worker implements Subscription {
        ...
    }
}

Scheduler.Worker

Sequential Scheduler for executing actions on a single thread or event loop. Unsubscribing the {@link Worker} unschedules all outstanding work and allows resources cleanup.

任务真正的执行是在Workder中进行的, 它包含了如下三个方法,用来执行任务:

public abstract static class Worker implements Subscription {
    Schedules an Action for execution.
    public abstract Subscription schedule(Action0 action);
    public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
    public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
}
  1. 第一个方法安排执行任务.
  2. 第二个方法一段延迟后执行任务.
  3. 第三个方法是周期性执行任务.

至于Worker是怎么执行任务的,线程/线程池模型是什么样的,由各个不同的Worker决定.

Schedulers

提供几个静态方法用来创建Scheduler.
对应方法如下:

方法 对应Scheduler
Schedulers.computation EventLoopsScheduler
Schedulers.io CachedThreadScheduler
Schedulers.from ExecutorScheduler
Schedulers.newThread NewThreadScheduler
Schedulers.immediate ImmediateScheduler
Schedulers.trampoline TrampolineScheduler

各种Scheduler及对应的Worker

大致包含下面几种Scheduler:

NewThreadScheduler

在一个新的线程中处理任务.对应Worker为NewThreadWorker.

NewThreadWorker使用了一个线程数目为1的ScheduledExecutorService:

ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);

NewThreadWorker除了有一个线程数为1的线程池处理加进来的任务外,它还有静态的方法负责启一个全局的线程去做线程池的清除(ScheduledThreadPoolExecutor remove & purge)工作.这是因为有些任务被取消了,但是还在工作队列里,所以需要定期清除.

所以为了记住申请了多少个NewThreadWorkder,它自己包含了一个静态的ConcurrentHashMap,定义如下:

private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;

ImmediateScheduler

在当前线程中处理任务.对应worker为InnerImmediateScheduler.

InnerImmediateScheduler:action.call直接在当前线程中进行.

TrampolineScheduler

Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.

任务在当前线程中执行,当并不是马上执行的,任务被放在队列中, 当其它排队的任务完成后,在当前线程排队开始执行.

对应的worker为InnerCurrentThreadScheduler,它里面有一个PriorityBlockingQueue,任务被加到里面,然后按时间优先级进行.

ExecutorScheduler

在指定的executor中处理任务.对应的Worker为ExecutorSchedulerWorker.

ExecutorSchedulerWorker:本身就是一个runnable, 添加进来的action放入ConcurrentLinkedQueue, 在runnable中会取出来一个个执行:

final ConcurrentLinkedQueue<ScheduledAction> queue; 

我的理解是ExecutorSchedulerWorker将自己弄成runnable,然后在指定的executor中执行,而那些任务是加到queue里的,整个ExecutorSchedulerWorker作为executor的一个runnable.这个runnable在queue里的任务执行完了或者订阅取消后执行完成.

ExecutorSchedulerWorker每次被Schedule时(就是任务加进来时),直接加到任务队列里.

EventLoopsScheduler

class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle

里面有一个静态的FixedSchedulerPool,FixedSchedulerPool可以简单的理解为有固定数目的线程的线程池,数目与CPU数目相同.

EventLoopsScheduler对应的worker为EventLoopWorker.EventLoopWorker其实是取FixedSchedulerPool里面的某一个线程.

简单的说, EventLoopsScheduler对应的worker为EventLoopWorker,使用了FixedSchedulerPool中的一个PoolWorker(其实就是数目为1的ScheduledExecutorService).

CachedThreadScheduler

CachedThreadScheduler extends Scheduler implements SchedulerLifecycle

待补充....
...

Scheduler.Worker

我们也可以直接使用Scheduler.Worker进行异步操作.

一般步骤是:

  1. 调用相应Scheduler的createWorker生成Worker.
  2. 调用Worker.schedule执行任务
  3. 调用Worker.unsubscribe.

例子如下:

Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

总结

  1. Schedulers是创建Scheduler的工厂, 提供了几种静态方法用来创建各种Scheduler.
  2. Scheduler提供创建Workder的接口.
  3. Worker提供了几种执行任务的接口,用来执行任务, 它底下利用各种类型的线程或者线程池完成任务的执行,它是真正执行任务的地方,
  4. 从代码可以看到, 整个应用程序同一类型的Scheduler只有一个,但是对应的worker切是不同的.比如Schedulers.computation, 对应的Scheduler只有一个,但是每次调用createWorker,获得的worker是scheduler里面worker数组中的一个(数组数目和处理器的数目相同).

第四部分"各种Scheduler及对应的Worker"总结的不好, 主要是没花时间好好总结,以后有时间再补吧.

相关文章

网友评论

    本文标题:Rxjava入门之Scheduler & Worker

    本文链接:https://www.haomeiwen.com/subject/bpneyttx.html