美文网首页
一个例子弄清RxJava线程切换

一个例子弄清RxJava线程切换

作者: ea9fd46b3c71 | 来源:发表于2018-04-10 14:41 被阅读33次

RxJava提供了Reactive Programming for Java,个人在Android开发中用的相当多,尤其线程切换和链式的数据处理,给码农们提供了极大的便利。在线程切换方面,之前一直用subscribeOn和observeOn配合,不过最近工作需要对线程更加细致的考量,比如zipWith的线程执行由谁决定,于是再重新编码理一遍。

Code

package com.opticalix.theme.zepp;

import com.opticalix.base.BaseRunner;
import com.opticalix.theme.synchronize.PrefixThreadFactory;
import com.opticalix.util.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.concurrent.Executors;

public class RxRunner implements BaseRunner {
    private final Scheduler mSchedulerIO;
    private final Scheduler mSchedulerIO2;
    private final Scheduler mSchedulerDisplay;
    private final Scheduler mSchedulerCompute;
    private final Scheduler mSchedulerCompute2;

    public RxRunner() {
        mSchedulerDisplay = Schedulers.from(Executors.newSingleThreadExecutor(new PrefixThreadFactory("DISPLAY")));
        PrefixThreadFactory ioFactory = new PrefixThreadFactory("IO");
        mSchedulerIO = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory));
        mSchedulerIO2 = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory));
        PrefixThreadFactory computeFactory = new PrefixThreadFactory("COMPUTE");
        mSchedulerCompute = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory));
        mSchedulerCompute2 = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory));
    }

    @Override
    public void run(String[] args) {
        Observable<Integer> src1 = getIntegerObservable(1);
        Observable<Integer> src2 = getIntegerObservable(2).subscribeOn(mSchedulerIO2);
        src1
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        printThreadInfo("map1");
                        return "map-1, num=" + integer;
                    }
                })
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        printThreadInfo("map2");
                        return "map-2, s=" + s;
                    }
                })
                .zipWith(src2, new Func2<String, Integer, String>() {
                    @Override
                    public String call(String s, Integer integer) {
                        printThreadInfo("zipWith");
                        return "zipWith s=" + s + ", num=" + integer;
                    }
                })
                //第一个subscribeOn指定整个流程的主要IO操作线程
                .subscribeOn(mSchedulerIO)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        printThreadInfo("doOnSubscribe");
                    }
                })
                //不能替代第一个subscribeOn,但是它的作用在于指定doOnSubscribe操作的线程
                .subscribeOn(mSchedulerCompute)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        printThreadInfo("doOnSubscribe");
                    }
                })
                .subscribeOn(mSchedulerCompute2)
                //指定最终subscriber的线程
                .observeOn(mSchedulerDisplay)
                .subscribe(new Subscriber<String>() {

                    @Override
                    public void onStart() {
                        //onStart线程不能指定,始终在subscribe的调用线程
                        printThreadInfo("onStart");
                    }

                    @Override
                    public void onCompleted() {
                        printThreadInfo("onCompleted");
                        System.exit(0);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }

                    @Override
                    public void onNext(String s) {
                        printThreadInfo("onNext, s=" + s);
                    }
                });
    }

    private Observable<Integer> getIntegerObservable(int num) {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onStart();
                subscriber.onNext(num);
                subscriber.onCompleted();
                printThreadInfo("create, num=" + num);
            }
        });
    }

    private void printThreadInfo(String tag) {
        String name = Thread.currentThread().getName();
        Logger.p("threadName=[%s], tag=%s", name, tag);
    }

}

其中ThreadFactory类如下:

package com.opticalix.theme.synchronize;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class PrefixThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String prefix;

    public PrefixThreadFactory(String prefix) {
        this.prefix = prefix + "-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, this.prefix + threadNumber.getAndIncrement());
        return thread;
    }
}

剩下的BaseRunner和Logger只是项目工具类,可以忽略。
执行结果如下:

threadName=[main], tag=onStart
threadName=[COMPUTE-1], tag=doOnSubscribe
threadName=[COMPUTE-2], tag=doOnSubscribe
threadName=[IO-1], tag=map1
threadName=[IO-1], tag=map2
threadName=[IO-1], tag=create, num=1
threadName=[IO-2], tag=zipWith
threadName=[DISPLAY-1], tag=onNext, s=zipWith s=map-2, s=map-1, num=1, num=2
threadName=[IO-2], tag=create, num=2
threadName=[DISPLAY-1], tag=onCompleted

分析

  1. doOnSubscribe执行顺序最靠前,不属于该链式处理过程,而如名所述在subscribe时就调用。它的执行线程取决于后面的第一个subscribeOn。如果它后面没有指定就使用当前线程。
  2. subscribeOn控制observable的创建和转换过程时的线程。subscribeOn在一条链式过程中只第一次指定有用。注意每一个创建的observable都要对应指定一个subscribeOn。
  3. observeOn可以指定多次,可控制其后的过程(从当前调用起到下一个observeOn)的线程。但是用多个observeOn要小心每个的范围。另外observeOn不能完全替代subscribeOn,因为create过程的线程只由第一个subscribeOn指定。
  4. zipWith,执行在zipWith参数中的observable的subscribeOn的线程。
  5. onStart的线程无法指定。

BestPractice

由于多个observeOn需要小心控制线程控制范围,所以个人倾向于demo code所示的用法,在链最后使用subscribeOn & observeOn。
小心多个创建的情形(每个observable都要指定subscribeOn)。
如果使用doOnSubscribe,它的线程取决于其后第一个subscribeOn,但在这之前要调用subscribeOn指定一个线程作为其他数据处理的主要线程。

相关文章

  • 一个例子弄清RxJava线程切换

    RxJava提供了Reactive Programming for Java,个人在Android开发中用的相当多...

  • RxJava源码分析-线程切换

    RxJava源码分析-线程切换 接着上篇分析,本篇我们来揭开RxJava线程切换的神秘面试,先上一段代码 这段代码...

  • RxJava的线程切换

    RxJava 线程切换 前言 在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程。...

  • Rxjava2 操作符原理(2)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 线程切换(3)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 基本用法(1)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 简析Flowable背压(4)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • RxJava源码分析之线程调度(一)

    RxJava强大的地方之一是他的链式调用,轻松地在线程之间进行切换。这几天也大概分析了一下RxJava的线程切换的...

  • RxJava:线程切换

    上一篇:RxJava:基本订阅流程 我们在Rxjava中最常用的两个方法: subscribeOn(Schedul...

  • Rxjava 切换线程

    根据自己的理解,公司用的还是Rxjava 1.x,不过很久以前使用过,没有过多注意这一块,不过还是想弄一下,周末的...

网友评论

      本文标题:一个例子弄清RxJava线程切换

      本文链接:https://www.haomeiwen.com/subject/qxtnhftx.html