美文网首页
图解RxJava(二)

图解RxJava(二)

作者: cowboy3000 | 来源:发表于2018-02-12 09:02 被阅读0次

    概述

    在 RxJava 中可以通过 subscribeOn/observeOn 很方便地完成上下游指定线程的切换,日常开发除了一些常用的Rx 操作符外,这两个方法也是打交道最多的。最初学习 RxJava 的时候总是死记硬背:subscribeOn 用于指定上游线程,observeOn 用于指定下游线程,多次用 subscribeOn 指定上游线程只有第一次有效,多次用 observeOn 指定下次线程,每次都有效…很久不用之后,总是把这两个方法搞混,那么这两个方法内部是怎么实现的呢?本篇先分析subscribeOn 方法。

    例子

    先回顾上篇文章的流程,饭店(Observable)开张前提要有厨师(ObservableOnSubscribe),接着改名叫沙县小吃(ObservableCreate),饭店接客(Observable.subscribe(observer)),创建服务员(CreateEmitter)把顾客和厨师关联起来,之后厨师每做一道菜都通过服务员端给顾客,整个流程如下:

    ](https://img.haomeiwen.com/i7125608/e0656a14ab601797.gif?imageMogr2/auto-orient/strip)](http://7xjvg5.com1.z0.glb.clouddn.com/rxjava2_01.gif)

    我们都知道 Andriod 有主线程,在未指定线程切换操作的情况下,上图的流程是跑在主线程中,另外主线程中往往还存在其他任务需要执行,所以结合线程来看应该是这样的

    image

    上图给人一种感觉,好像厨师的菜是「秒做」出来的,然而我们都知道现实生活中厨师做菜是需要时间的,在安卓中,主线程执行耗时操作会阻塞后续的任务,还有可能引起 ANR,所以厨师做菜的操作不能放在主线程中 。下面让上游睡5秒模拟耗时操作

    上游:

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 744px;">

    final Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
    
            public void subscribe(ObservableEmitter<String> e) throws Exception {
    
                Thread.sleep(5000);
    
                Log.e(TAG, "服务员从厨师那取得 扁食" + " 线程名: "+Thread.currentThread().getName());
    
                e.onNext("扁食");
    
                Log.e(TAG, "服务员从厨师那取得 拌面" + " 线程名: " + Thread.currentThread().getName());
    
                e.onNext("拌面");
    
                Log.e(TAG, "服务员从厨师那取得 蒸饺" + " 线程名: " + Thread.currentThread().getName());
    
                e.onNext("蒸饺");
    
                Log.e(TAG, "厨师告知服务员菜上好了" + " 线程名: " + Thread.currentThread().getName());
    
                e.onComplete();
    
            }
    
        });
    

    </pre>

    |

    </figure>

    下游:

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 737px;">

    Observer<String> observer = new Observer<String>() {

            @Override
    
            public void onSubscribe(Disposable d) {
    
                Log.d(TAG, "来个沙县套餐!!!" + " 线程名: " + Thread.currentThread().getName());
    
            }
    
            @Override
    
            public void onNext(String s) {
    
                Log.d(TAG, "服务员端给顾客  " + s + " 线程名: " + Thread.currentThread().getName());
    
            }
    
            @Override
    
            public void onError(Throwable e) {
    
            }
    
            @Override
    
            public void onComplete() {
    
                Log.d(TAG, "服务员告诉顾客菜上好了" + " 线程名: " + Thread.currentThread().getName());
    
            }
    
        };
    

    </pre>

    |

    </figure>

    建立联系,以及执行其他任务(这里只是打了个 log )

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 213px;">

    source.subscribe(observer);

    Log.d(TAG, "其他任务执行");

    </pre>

    |

    </figure>

    打印如下:

    image

    可以看到,由于上游耗时,导致主线程中「其他任务」被阻塞了,因此需要新建一个子线程来处理上游的耗时任务,使用 RxJava 的 subscribeOn 就能轻松实现,修改代码:

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 321px;">

    source.subscribeOn(Schedulers.newThread())

                .subscribe(observer);
    

    Log.e(TAG, "其他任务执行");

    </pre>

    |

    </figure>

    打印如下:

    image

    此时「其他任务」不会被阻塞。从上面的 log 可以看到,创建了 RxNewThreadScheduler-1 的子线程来执行上游的耗时任务,并且此时下游除 onSubscribe 外,所有方法都执行在子线程中,它是怎么做到的?(通常情况下游会调用 observeOn(AndroidSchedulers.mainThread()) 来更新UI,下篇分析)。

    源码分析

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 321px;">

    source.subscribeOn(Schedulers.newThread())

                .subscribe(observer);
    

    </pre>

    |

    </figure>

    上面的代码简短优雅,其实做了很多事情。基于上篇的分析,在执行完 Observable.create 和 new Observer 后此时主线程应该是下面的样子

    image

    Schedulers.newThread()

    Scheduler 翻译为调度器,RxJava2 中 Scheduler 的一些常用子类如下:

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 478px;">

    static final class SingleHolder {

    static final Scheduler DEFAULT = new SingleScheduler();
    

    }

    static final class ComputationHolder {

    static final Scheduler DEFAULT = new ComputationScheduler();
    

    }

    static final class IoHolder {

    static final Scheduler DEFAULT = new IoScheduler();
    

    }

    static final class NewThreadHolder {

    static final Scheduler DEFAULT = new NewThreadScheduler();
    

    }

    </pre>

    |

    </figure>

    Schedulers.newThread() 会初始化 NewThreadScheduler ;

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 692px;">

    public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;
    
    //看着很眼熟,原来我们上游的线程名称的一部分就是这么起的"RxNewThreadScheduler-1"
    
    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    
    //线程工厂
    
    private static final RxThreadFactory THREAD_FACTORY;
    
    /** The name of the system property for setting the thread priority for this Scheduler. */
    
    //用来设置线程优先级的key
    
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
    
    //静态代码块
    
    static {
    
        //确定线程的优先级,这里初始化为5 NORM_PRIORITY
    
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
    
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
    
        //初始化线程工厂,传入线程名称和优先级
    
        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    
    }
    
    //赋值
    
    public NewThreadScheduler() {
    
        this(THREAD_FACTORY);
    
    }
    
    //赋值
    
    public NewThreadScheduler(ThreadFactory threadFactory) {
    
        this.threadFactory = threadFactory;
    
    }
    
    @NonNull
    
    @Override
    
    //这个方法很重要,很重要,很重要!!!后面会用到
    
    public Worker createWorker() {
    
        return new NewThreadWorker(threadFactory);
    
    }
    

    }

    </pre>

    |

    </figure>

    上面的注释已经解释得很清楚了,在初始化 NewThreadScheduler 的时候会创建 RxThreadFactory,并指明了该线程工厂之后生产线程的名称和默认优先级;RxThreadFactory 是 ThreadFactory 的子类,也没多少代码

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 707px;">

    public RxThreadFactory(String prefix, int priority) {

    this(prefix, priority, false);
    

    }

    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {

    this.prefix = prefix;
    
    this.priority = priority;
    
    this.nonBlocking = nonBlocking;
    

    }

    @Override

    //生产新线程的方法!!!

    public Thread newThread(Runnable r) {

    StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
    
    String name = nameBuilder.toString();
    
    Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
    
    t.setPriority(priority);
    
    t.setDaemon(true);
    
    return t;
    

    }

    </pre>

    |

    </figure>

    RxThreadFactory 中的 newThread 方法用来生产新线程。Schedulers.newThread() 到此就完成了它的工作,总结下来就是:

    1.创建线程调度器 NewThreadScheduler;

    2.创建线程工厂 RxThreadFactory ;

    到目前为止这些操作都是在主线程中执行的,子线程还未被创建。

    image

    subscribeOn(Scheduler scheduler)

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 614px;">

    public final Observable<T> subscribeOn(Scheduler scheduler) {

    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    

    }

    </pre>

    |

    </figure>

    该方法返回 Observable ,创建了 ObservableSubscribeOn ,名字起得又很容易让人头晕…这里就不画关系图了,只关心它的属性即可,它是 Observable(饭店) 的子类,结合我们举的例子,就给它起名黄焖鸡饭店;this 就是上面传过来的沙县小吃(ObservableCreate) ;初始化如下:

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 664px;">

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {

    final Scheduler scheduler;
    
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    
        super(source);
    
        this.scheduler = scheduler;
    
    }
    
    //省略其他代码
    

    }

    </pre>

    |

    </figure>

    目前为止这些操作都是在主线程中执行,子线程还未创建

    image

    subscribe(Observer observer)

    通过上篇学习可知,subscribe(observer) 内部会调用 subscribeActual(observer) ,该方法是个抽象方法,具体实现在 Observable(饭店) 的子类,现在是 ObservableSubscribeOn(黄焖鸡饭店)。

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 578px;">

    public void subscribeActual(final Observer<? super T> s) {

    //注释1
    
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
    //注释2
    
    s.onSubscribe(parent);
    
    //注释3
    
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    }

    </pre>

    |

    </figure>

    注释1 又冒出来一个 SubscribeOnObserver,同样只关心它的属性,SubscribeOnObserver 是AtomicReference的子类(保证原子性),同时实现了 Observer(也是个顾客) 和 Disposable(保证一次性操作) 接口;为了方便理解,假设之前传的顾客叫小明,这里的顾客叫小红,小红会持有小明的引用(actual),之后一系列的方法实际上会调用到小明的方法。

    注释2 执行顾客小明的 onSubscribe 方法,我们发现到目前为止还没有创建过子线程,所以解释了上面 log 下游 onSubscribe 打印线程名为 main。

    image

    注释3 分为下面3步

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 549px;">

               步骤③                    步骤②             步骤①
    

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

    </pre>

    |

    </figure>

    步骤① SubscribeTask 是 ObservableSubscribeOn(黄焖鸡饭店) 的内部类,实现了 Runnable 接口

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 454px;">

    final class SubscribeTask implements Runnable {

    private final SubscribeOnObserver<T> parent;
    
    SubscribeTask(SubscribeOnObserver<T> parent) {
    
        this.parent = parent;
    
    }
    
    @Override
    
    public void run() {
    
        //这里的 source 就是之前传的 ObservableCreate(沙县小吃)
    
        source.subscribe(parent);
    
    }
    

    }

    </pre>

    |

    </figure>

    如果 run 方法被触发,那么执行顺序是:

    Observable.subscribe() —> Observable.subscribeActual() —> ObservableCreate.subscribeActual(),绕了一圈又回到上篇的那个流程。为了方便理解,SubscribeTask 就是黄焖鸡饭店(ObservableSubscribeOn)的「任务」也就是沙县小吃的「做菜」(ObservableCreate.subscribeActual)。所以现在万事具备,只差子线程了。

    步骤② Scheduler.scheduleDirect()

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 578px;">

    /**

    • Schedules the given task on this scheduler non-delayed execution.

    • <p>

    • This method is safe to be called from multiple threads but there are no

    • ordering guarantees between tasks.

    • @param run the task to execute

    • @return the Disposable instance that let's one cancel this particular task.

    • @since 2.0

    */

    //这里调度的时候不保证顺序

    //第二个参数为0,不延时,直接调度

    @NonNull

    public Disposable scheduleDirect(@NonNull Runnable run) {

    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    

    }

    </pre>

    |

    </figure>

    注意这个方法的注释,该方法调度的时候不保证顺序,所以平时在配合使用 subscribeOn(子线程)/observeOn(主线程) 会出现上下游输出顺序不确定的情况(比如有时候上游生产了3个后才逐个发送给下游,有时上游生产了2个,就开始发送给下游),这也是多线程的一个特点。当然这里不会出现这个情况,因为从输出来看,此时上下游都在一个子线程里。貌似跑远了…继续分析

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 685px;">

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

    final Worker w = createWorker();
    
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
    DisposeTask task = new DisposeTask(decoratedRun, w);
    
    w.schedule(task, delay, unit);
    
    return task;
    

    }

    // Scheduler 中为抽象方法

    public abstract Worker createWorker();

    </pre>

    |

    </figure>

    前面创建 NewThreadScheduler 的时候说 createWorker() 方法很重要,这里派上用场了:

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 642px;">

    //NewThreadScheduler.java

    public Worker createWorker() {

    return new NewThreadWorker(threadFactory);
    

    }

    //NewThreadWorker.java

    public NewThreadWorker(ThreadFactory threadFactory) {

        //实例化 ScheduledExecutorService 对象 executor 管理线程池
    
        executor = SchedulerPoolFactory.create(threadFactory);
    
    }
    

    //SchedulerPoolFactory.java

    public static ScheduledExecutorService create(ThreadFactory factory) {

    //默认线程池大小为1
    
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    
    if (exec instanceof ScheduledThreadPoolExecutor) {
    
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
    
        POOLS.put(e, exec);
    
    }
    
    return exec;
    

    }

    </pre>

    |

    </figure>

    NewThreadWorker 内部维护一个线程池 ScheduledExecutorService , 主要作用是提供延时调度和周期性调度,默认线程池大小为1,线程池里的线程通过我们传的线程工厂创建。

    image

    之后把 NewThreadWorker 和步骤①中的任务包装成 DisposeTask,又是一个Runnable

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 278px;">

    public void run() {

    //注意这里会获取当前所在线程
    
    runner = Thread.currentThread();
    
    try {
    
        //在当前线程中执行
    
        decoratedRun.run();
    
    } finally {
    
        //执行完后断开
    
        dispose();
    
        runner = null;
    
    }
    

    }

    </pre>

    |

    </figure>

    最后会执行 NewThreadWorker.schedule 方法

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 735px;">

    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {

    if (disposed) {
    
        return EmptyDisposable.INSTANCE;
    
    }
    
    return scheduleActual(action, delayTime, unit, null);
    

    }

    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {

    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
    
    try {
    
        Future<?> f;
    
        //不延时,直接调度
    
        if (delayTime <= 0L) {
    
            //此时任务执行在子线程中
    
            f = executor.submit(task);
    
        } else {
    
            f = executor.schedule(task, delayTime, unit);
    
        }
    
        task.setFuture(f);
    
        return task;
    
    } catch (RejectedExecutionException ex) {
    
        RxJavaPlugins.onError(ex);
    
        return EmptyDisposable.INSTANCE;
    
    }
    

    }

    </pre>

    |

    </figure>

    到这里终于看到任务(ObservableCreate.subscribeActual)执行在子线程中。

    步骤③ parent.setDisposable 设置可中断。至此流程如下

    image

    之后所有的事情都是在子线程中进行的,上篇已经分析过了

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 598px;">

    protected void subscribeActual(Observer<? super T> observer) {

    //创建服务员,并和顾客联系,这里的顾客是小红
    
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    
    //执行顾客小红的的 onSubscribe ,注意这里不会再回调顾客小明的onSubscribe
    
    //因为顾客小红的 onSubscribe 中只是将接收事件的行为设置成一次性,并没有回调小明方法
    
    observer.onSubscribe(parent);
    
    try {
    
        //厨师做菜,并和服务员联系
    
        source.subscribe(parent);
    
    } catch (Throwable ex) {
    
        Exceptions.throwIfFatal(ex);
    
        parent.onError(ex);
    
    }
    

    }

    </pre>

    |

    </figure>

    后续还有:服务员端菜(CreateEmitter.onNext) —> 顾客小红拿到菜(SubscribeOnObserver.onNext) —> 顾客小明拿到菜(Observer.onNext),模拟如下:

    image

    多次subscribeOn

    <figure class="highlight plain" style="display: block; margin: 20px 0px; overflow: auto; padding: 0px; font-size: 13px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border-radius: 1px; font-family: Lato, "PingFang SC", "Microsoft YaHei", sans-serif; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: justify; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(134, 145, 148); background: rgb(239, 242, 243); line-height: 1.6; border: none; text-align: right;">

    1

    2

    3

    </pre>

    |

    <pre style="overflow: auto; font-family: consolas, Menlo, "PingFang SC", "Microsoft YaHei", monospace; font-size: 13px; margin: 0px; padding: 10px; color: rgb(77, 77, 76); background: rgb(247, 247, 247); line-height: 1.6; border: none; width: 335px;">

    source.subscribeOn(Schedulers.newThread())

        .subscribeOn(Schedulers.newThread())
    
        .subscribe(observer);
    

    </pre>

    |

    </figure>

    上面我先把任务从一个线程切换到另一个线程,但是只有最先指定的有效(可以用 io 线程更容易看出差别),这是为啥呢?通过上面的分析我们知道,subscribeOn() 每次会返回一个 Observable ,为了方便理解,把先指定返回的Observable 叫黄焖鸡1号店,后指定返回的 Observable 叫黄焖鸡2号店,第一个 subscribeOn() 执行:

    image

    黄焖鸡1号店创建的时候会持有沙县小吃的引用,接着第二个 subscribeOn() 执行:

    image

    黄焖鸡2号店创建的时候会持有黄焖鸡1号店的引用,接着执行 subscribe(observer) 方法,会先调用黄焖鸡2号店的 subscribeActual() 方法:

    image

    接着调用黄焖鸡2号店的 subscribeActual() 方法 :

    image

    可以看到此时黄焖鸡1号店的 Worker 和小红是创建在子线程2的,并在子线程2中把当前线程切到了新的线程,后面的操作就和上面一样了,这就是为啥多次通过 subscribeOn 指定线程,只有最先指定的有效。

    最后

    多次用 subscribeOn 指定上游线程真的只有第一次有效吗?其实不然,具体可以看Dávid Karnok 的这篇博客,其中涉及到一些 Rx 操作符操作,本篇只是介绍 subscribeOn 的使用和原理,就不引入其他内容,mark 下日后再捡起来看。

    感谢

    When multiple subscribeOn()s do have effect

    SubscribeOn and ObserveOn

    相关文章

      网友评论

          本文标题:图解RxJava(二)

          本文链接:https://www.haomeiwen.com/subject/vylrtftx.html