深入理解 RxJava2:从 observeOn 到作用域(4)

作者: 蝶翼的罪 | 来源:发表于2018-08-26 23:45 被阅读30次

    前言

    欢迎来到深入理解 RxJava2 系列第四篇。前一篇中我们认识了线程操作符,并详细介绍了 subscribeOn 操作符,最后一个例子给大家介绍使用该操作符的注意事项,由于篇幅问题就戛然而止了。本文将继续介绍 observeOn,并用这两者做一些比较帮助大家深刻理解它们。

    observeOn

    前文我们提过subscribeOn是对上游起作用的,而observeOn恰恰相反是作用于下游的,因此从某种意义上说observeOn的功能更加强大与丰富。

    方法描述

    public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
    

    scheduler

    image

    如上图所示,scheduler在这里起的作用就是调度任务,下游消费者的onNext / onComplete / onError均会在传入目标scheduler中执行。

    delayError

    delayError 顾名思义,当出现错误时,是否会延迟onError的执行。

    为什么会出现这样的情况,因为消费的方法均是在Scheduler中执行的,因此会有生产和消费速率不一致的情形。那么当出现错误时,可能队列里还有数据未传递给下游,因此delayError这个参数就是为了解决这个问题。

    delayEror默认为false, 当出现错误时会直接越过未消费的队列中的数据,在下游处理完当前的数据后会立即执行onError,如下图所示:

    image

    如果为true则会保持和上游一致的顺序向下游调度onNext,最后执行onError

    bufferSize

    这里着重强调一下bufferSize这个参数,在FlowableObservableobserveOn中都有这个参数,但是在两者中bufferSize的效果是完全不一样的,因为选择的数据结构不一样:

    • Flowable:queue = new SpscArrayQueue<T>(bufferSize)
    • Observable:queue = new SpscLinkedArrayQueue<T>(bufferSize)
    SpscXXXQueue

    上述的两种队列均是 RxJava 中提供的无锁的单生产者单消费者的队列,是 Fast Flow 和 BQueue 在 Java 中的实现,用以提升 RxJava 数据流的吞吐量。关于细节我们不再赘述,有兴趣的读者可以自己去搜寻。

    但是在上面两个队列中,SpscArrayQueue是一个固定长度缓存的队列,当队列满了时继续入队,Flowable 会抛出MissingBackpressureException。此外还有一个小细节,实际缓存的长度大于等于传入值的 2 的幂。例如传入 20 会变成 32,而传入 32 则还是 32,大家使用时请注意。

    SpscLinkedArrayQueueSpscArrayQueue相似,但当队列满后会自动扩容,因此永远也不会导致 MBE,但是可能会因为消费和生产的速度不一致导致 OOM。

    这里也呼应了笔者在《深入理解 RxJava2:前世今生(1)》 中提到过的FlowableObservable的差别。

    作用域

    上面我们提过,observeOn是对下游生效的,一个简单的例子:

    Flowable.just(1).observeOn(Schedulers.io())
            .subscribe(i -> {
                System.out.println(Thread.currentThread().getName());
            });
            
    输出:
    RxCachedThreadScheduler-1
    

    但是当有多个操作符,且存在多次observeOn时,每个方法都是执行在什么线程呢?

    Flowable.just(1).observeOn(Schedulers.io())
            .map(i -> {
                System.out.println(Thread.currentThread().getName());
                return i;
            })
            .observeOn(Schedulers.computation())
            .subscribe(i -> {
                System.out.println(Thread.currentThread().getName());
            });
            
    输出:
    RxCachedThreadScheduler-1
    RxComputationThreadPool-1
    

    这里就涉及到一些 RxJava 实现的细节,多数操作符是基于上游调用onNext / onComplete / onError 的进一步封装,在不涉及包含Scheduler的操作符的情况下,在上游调用了observeOn后,后续操作符的方法都是执行在上游调度的线程。因此每个操作符所执行的线程都是由上游最近的一个observeOnScheduler决定。

    因此笔者称之为最近生效原则,但是请注意,observeOn是影响下游的,因此操作符所执行的线程受的是最近上游observeOn影响,切莫记反了。

    示例

    因此在实际使用中灵活的使用observeOn,使得代码的效率最大化。这里笔者再举个例子:

    Flowable.just(new File("input.txt"))
            .map(f -> new BufferedReader(new InputStreamReader(new FileInputStream(f))))
            .observeOn(Schedulers.io())
            .flatMap(r -> Flowable.<String, BufferedReader>generate(() -> r, (br, e) -> {
                String s = br.readLine();
                if (s != null) {
                    e.onNext(s);
                } else {
                    System.out.println(Thread.currentThread().getName());
                    e.onComplete();
                }
            }, BufferedReader::close))
            .observeOn(Schedulers.computation())
            .map(Integer::parseInt)
            .reduce(0, (total, item) -> {
                System.out.println(item);
                return total + item;
            })
            .subscribe(s -> {
                System.out.println("total: " + s);
                System.out.println(Thread.currentThread().getName());
            });
            
    输出:
    RxCachedThreadScheduler-1
    1
    2
    3
    4
    5
    total: 15
    RxComputationThreadPool-1
    

    如上代码所示,我们从 input.txt 读出每行的字符串,然后转成一个 int, 最后求和。这里我们灵活地使用了两次observeOn,在读文件时,调度至IoScheduler,随后做计算工作时调度至ComputationScheduler,从控制台的输出可以见线程的的确确是我们所期望的。当然这里求和只是一个示例,读者们可以举一反三。

    事实上上面的代码还不是最优的:

    Flowable.just(new File("input.txt"))
            .map(f -> new BufferedReader(new InputStreamReader(new FileInputStream(f))))
            .observeOn(Schedulers.io())
            .flatMap(r -> Flowable.<String, BufferedReader>generate(() -> r, (br, e) -> {
                String s = br.readLine();
                if (s != null) {
                    e.onNext(s);
                } else {
                    System.out.println(Thread.currentThread().getName());
                    e.onComplete();
                }
            }, BufferedReader::close))
            .parallel()
            .runOn(Schedulers.computation())
            .map(Integer::parseInt)
            .reduce((i, j) -> {
                System.out.println(Thread.currentThread().getName());
                return i + j;
            })
            .subscribe(s -> {
                System.out.println("total: " + s);
                System.out.println(Thread.currentThread().getName());
            });
    输出:
    RxCachedThreadScheduler-1
    RxComputationThreadPool-1
    RxComputationThreadPool-2
    RxComputationThreadPool-4
    RxComputationThreadPool-4
    total: 15
    RxComputationThreadPool-4
    

    如上代码所示我们可以充分利用多核的性能,通过parallel来并行运算,当然这里用在求和就有点杀鸡用牛刀的意思了,笔者这里只是一个举例。更多 parallel 相关的内容,留待后续分享。

    subscribeOn

    回到正题,事实上subscribeOn同样遵循最近生效原则,但是与observeOn恰恰相反。操作符会被最近的下游的subscribeOn调度,因为subscribeOn影响的是上游。

    但是和observeOn又有一些微妙的差别在于,我们通常调用subscribeOn更加关注最上游的数据源的线程。因此通常不会在中间过程中调用多次,任意的调用一次subscribeOn均会影响上游所有操作符的subscribe所在的线程,且不受observeOn的影响。这是由于这两者机制的不同,subscribeOn是将整个上游的subscribe方法都调度到目标线程了。

    多数据源

    但是在一些特别的情况下subscribeOn多次的使用也是有意义的,尤其是上游有多个数据源时。多数据源也就是存在超过一个Publisher的操作符,如:zipWith / takeUntil / amb,如果此类操作符如果在subscribeOn作用域内,则对应的多个数据源均会受到影响,望大家注意。

    交叉对比

    最后我们再用一个例子,将observeOnsubscribeOn混合使用,验证我们上面的结论:

    Flowable.<Integer>create(t -> {
        System.out.println(Thread.currentThread().getName());
        t.onNext(1);
        t.onComplete();
    }, BackpressureStrategy.BUFFER)
            .observeOn(Schedulers.io())
            .map(i -> {
                System.out.println(Thread.currentThread().getName());
                return i;
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.computation())
            .subscribe(i -> {
                System.out.println(Thread.currentThread().getName());
        });
    
    输出:
    RxNewThreadScheduler-1
    RxCachedThreadScheduler-1
    RxComputationThreadPool-1
    

    数据流的线程如下图所示:

    image

    结语

    observeOn作为 RxJava2 的核心实现自然不只是笔者上面说的那些内容。笔者有意的避开了源码,不希望同时将过多的概念灌输给大家。事实上observeOn的源码中深度实现了所谓的Fusion这个隐晦的概念,这些深层次的源码分析留到这个系列的后期,笔者也会一一分享。

    感觉大家的阅读,欢迎关注笔者公众号,可以第一时间获取更新,同时欢迎留言沟通。

    image

    相关文章

      网友评论

        本文标题:深入理解 RxJava2:从 observeOn 到作用域(4)

        本文链接:https://www.haomeiwen.com/subject/wvbaiftx.html