美文网首页RxJavaAndroid知识
RxJava 2.0 线程切换简单分析

RxJava 2.0 线程切换简单分析

作者: 喝那个东南西北风 | 来源:发表于2017-03-02 11:21 被阅读118次

    简单介绍一下RxJava 2.0的多种流:

    Paste_Image.png
    1.Completable

    特性:这个流没有数据,只会收到error或者complete
    示例:

    Completable.complete()
    .subscribe(() -> printThread("on complete 1"));
    
    Completable.error(new Callable<Throwable>() {
    
        @Override
        public Throwable call() throws Exception {
            // TODO Auto-generated method stub
            return new NullPointerException();
        }
    })
    .subscribe(() -> printThread("on complete 2"),
            (e) -> printThread("on error 2 [" + e.getMessage() + "]"));
    
    输出:
    on complete 1[main]
    on error 2 [null][main]
    
    2.Single

    特性:这个流只会收到一个数据或者一个error,也就是要不然执行onSuccess要不然就执行onError
    示例:

    Single.just(1)
        .subscribe(i -> printThread(String.valueOf(i)));
    
    System.out.println("==============");
    
    Single.fromCallable(new Callable<Integer>() {
    
        @Override
        public Integer call() throws Exception {
            // TODO Auto-generated method stub
            return 1 / 0;
        }
    }).subscribe((i, error) -> {
        printThread(String.valueOf(i));
        printThread(String.valueOf(error));
    });
    
    输出:
    1[main]
    ==============
    null[main]
    java.lang.ArithmeticException: / by zero[main]
    

    可以看到,正常数据下,收到了数据1,出错的时候,只会收到一个error。
    我们看Single的订阅接口SingleObserver,如下:

    public interface SingleObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T value);
        void onError(Throwable e);
    }
    

    只会存在2种回调,符合我们的打印输出。(上面示例代码仅仅调用的是简单的单个情况订阅,查看源码,最终都封装成了SingleObserver)

    3.Maybe

    特性:和Single类似正常流程也是只执行onSuccess,但在出现错误的时候,可以选择是执行onError还是onComplete
    示例(正常流程):

    Maybe.just(1)
        .subscribe(i -> printThread("success " + i), 
                (e) -> printThread("error " + e), 
                () -> printThread("complete"));
    
    输出:
    success 1[main]
    

    示例(错误):

    Maybe.fromCallable(() -> {
        return 1 / 0;
    }).subscribe(i -> printThread("success " + i), 
                (e) -> printThread("error " + e), 
                () -> printThread("complete"));
    输出:
    error java.lang.ArithmeticException: / by zero[main]
    
    我们调用onErrorComplete干预:
    Maybe.fromCallable(() -> {
        return 1 / 0;
    })
    .onErrorComplete()
    .subscribe(i -> printThread("success " + i), 
                (e) -> printThread("error " + e), 
                () -> printThread("complete"));
    输出:
    complete[main]
    
    4.Flowable

    和Observable功能几乎一模一样,区别在于:
    1.定义的类功能不一样

    Observable 的订阅者是:
    
    public interface Observer<T> {
        void onSubscribe(Disposable d);
        void onNext(T value);
        void onError(Throwable e);
        void onComplete();
    }
    
    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }
    
    Flowable的订阅者是:
    
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    
    public interface Subscription {
        public void request(long n);
        public void cancel();
    }
    

    2.Flowable可以通过Subscription对象,调用request(n),响应式拉取数据,来支持背压特性

    示例代码:

    private static int count = 1;
    
    private static boolean isDataEnd() {
        return count > 1000;
    }
    
    private static void test() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
    
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
    
                while (!isDataEnd() && !e.isCancelled()) {// 生产数据条件
                    while (e.requested() <= 0) {// 如果e.request值是0,说明消费者还没有消费完毕,我们就休息
                        Thread.sleep(1000);
                    }
                    printThread(String.format("OUT生产数据[%d]", count));
                    e.onNext(count++);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER)
    
                .subscribe(new Subscriber<Integer>() {
    
                    private Subscription mSub;
    
                    @Override
                    public void onComplete() {
                        // TODO Auto-generated method stub
                        printThread("消费完毕");
                        unloopMain();
                    }
    
                    @Override
                    public void onError(Throwable arg0) {
                        // TODO Auto-generated method stub
                        printThread(arg0.getMessage());
                        unloopMain();
                    }
    
                    @Override
                    public void onNext(Integer value) {
                        // TODO Auto-generated method stub
                        try {
                            Thread.sleep(100);
                            printThread(String.format("IN消费数据[%d]", value));
                            mSub.request(1);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
    
                    }
    
                    @Override
                    public void onSubscribe(Subscription arg0) {
                        // TODO Auto-generated method stub
                        mSub = arg0;
                        mSub.request(1);
                    }
                });
        loopMain();
    }
    
    private static void loopMain() {
        do {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if (needBreak)
                break;
        } while (true);
        System.out.println("END");
    }
    
    private static void unloopMain() {
        needBreak = true;
    }
    
    输出如下:
    OUT生产数据[1][main]
    IN消费数据[1][main]
    OUT生产数据[2][main]
    IN消费数据[2][main]
    OUT生产数据[3][main]
    IN消费数据[3][main]
    OUT生产数据[4][main]
    IN消费数据[4][main]
    OUT生产数据[5][main]
    

    流程分析:
    1.调用Flowable.create生产我们的数据流,里面有个新的类型,如下:

    public interface FlowableEmitter<T> extends Emitter<T> {
        void setDisposable(Disposable s);
        void setCancellable(Cancellable c);
        long requested();
        boolean isCancelled();
        FlowableEmitter<T> serialize();
    }
    

    其他方法和Observable发射器的功能类似,我们主要需要requested()方法,获取当前请求个数,如果为0代表还在消费数据,不需要新的数据,我们就休息。
    2.BackpressureStrategy,代表支持背压的策略,如下:

    public enum BackpressureStrategy {
        //结合onBackpressureXXX()才会生效
        MISSING,//在onSubcription时候,s.request(Long.MAX_VALUE);设置最大值。导致一直生产数据。
        //直接抛出异常,如果数据溢出
        ERROR,
        //所有数据会保存到缓存里面
        BUFFER,
        //丢弃最新的
        DROP,
       //保存最新的,覆盖老的
        LATEST
    }
    

    3.背压策略,一定要在线程变换之前去调用,线程变换后,收到的订阅者发生了变化,不是同一个。上面代码,因为是同一个线程,调用request(1),将直接设置上游数据生产者FlowableEmitter的值为1,但如果是切换线程了,将无法直接影响,增加代码如下:

    Flowable.create(...)
    .observeOn(Schedulers.computation())//切换线程
    .subscribe(...);
    
    打印如下:
    ...
    OUT生产数据[127][requested = 2][main]
    OUT生产数据[128][requested = 1][main]
    IN消费数据[1][RxComputationThreadPool-1]
    IN消费数据[2][RxComputationThreadPool-1]
    IN消费数据[3][RxComputationThreadPool-1]
    IN消费数据[4][RxComputationThreadPool-1]
    ...
    

    我们可以看到,下游的request(1)并不会影响上游的值,上游使用了默认值128的缓存大小。先生产了128个数据,再开始消费。两个疑问解答:
    1.128怎么来的,在调用BackpressureStrategy.Buffer时候,生成的FlowableEmitter实际类型是BufferAsyncEmitter,它默认值就是128

    2.我们怎么去控制这个值呢,既然下游影响不到这个大小,可以通过如下代码:

    ...
    .observeOn(Schedulers.computation(), false, 4)
    ...
    
    打印输出,如下:
    ...
    OUT生产数据[1][requested = 4][main]
    OUT生产数据[2][requested = 3][main]
    OUT生产数据[3][requested = 2][main]
    OUT生产数据[4][requested = 1][main]
    IN消费数据[1][RxComputationThreadPool-1]
    IN消费数据[2][RxComputationThreadPool-1]
    IN消费数据[3][RxComputationThreadPool-1]
    IN消费数据[4][RxComputationThreadPool-1]
    OUT生产数据[5][requested = 3][main]
    OUT生产数据[6][requested = 2][main]
    OUT生产数据[7][requested = 1][main]
    IN消费数据[5][RxComputationThreadPool-1]
    IN消费数据[6][RxComputationThreadPool-1]
    IN消费数据[7][RxComputationThreadPool-1]
    ...
    

    这里看到,我们只生产了4个数据,就消费了,以后就是生产3个,这是因为在我们调用observeOn()生成的内部对象FlowableObserveOn里面有个limit = prefetch - (prefetch >> 2); prefetch实际就是传入的buffersize。如下:

    void runAsync(){
      ...
      e++;
      if (e == limit) {
        if (r != Long.MAX_VALUE) {
            r = requested.addAndGet(-e);
        }
        s.request(e);
        e = 0L;
      }
    }
    

    这个方法在内部类 FlowableObserveOn$ObserveOnSubscriber,我们异步调用sub.request(n)将最终触发到runAsync(),它会去设置requested也就是上游的数据,所以这个n将无法直接反应到上游,而同步的n是直接设置给上游了

    分析线程变换:

    使用最简单的Single来进行探究,代码如下:

    private static void syncRx(){
        Single.fromCallable(() -> {
            printThread("生产数据");
            return "s";
        })
        .subscribe((s) -> {
            printThread("消费数据");
        }); 
    }
    
    private static void printThread(String msg){
        System.out.println(String.format("[%s][%s]", msg, Thread.currentThread().getName()));
    }
    
    打印输出:
    [生产数据][main]
    [消费数据][main]
    

    查看源码,Single.fromCallable 生成的就是SingleFromCallable对象,订阅表达式生成的是ConsumerSingleObserver对象,代码变换如下:

    private static void syncChangeRx() {
        new SingleFromCallable<String>(() -> {
            printThread("生产数据");
            return "s";
        }).subscribe(new ConsumerSingleObserver<>((s) -> {
            printThread("消费数据");
        }, Functions.ERROR_CONSUMER));
    }
    

    输出打印和上面一模一样,所以点开方法subscribe(),如下:

    public final void subscribe(SingleObserver<? super T> subscriber) {
        ...
        try {
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            ...
        }
    }
    

    最终调用的是抽象方法 subscribeActual(),而我们知道我们来源于SingleFromCallable,所以实际实现在SingleFromCallable.subscribeActual()方法里面,如下:

    @Override
    protected void subscribeActual(SingleObserver<? super T> s) {
        s.onSubscribe(EmptyDisposable.INSTANCE);
        try {
            T v = callable.call();
            if (v != null) {
                s.onSuccess(v);
            } else {
                s.onError(new NullPointerException("The callable returned a null value"));
            }
        } catch (Throwable e) {
            ...
        }
    }
    

    里面也很简单,参数 s,就是我们自己定义的后生产的ConsumerSingleObserver对象,callable就是我们定义的生产对象,所以下游的订阅动作,如下:

    1.订阅触发流程
    2.进入真实SingleFromCallable.subscribeActual()
    3.调用callable.call()生产数据或者异常
    4.回调给ConsumerSingleObserver,我们自己的消费者
    5.完成结束
    

    以上就是最简单的单线程调用了,在以上的基础上,我们增加一个线程切换,如下:

    private static void asyncChangeRx() {
        new SingleFromCallable<String>(() -> {
            printThread("生产数据");
            return "s";
        })
        .observeOn(Schedulers.computation())
        .subscribe(new ConsumerSingleObserver<>((s) -> {
            printThread("消费数据");
        }, Functions.ERROR_CONSUMER));
    }
    
    打印输出:
    [生产数据][main]
    [消费数据][RxComputationThreadPool-1]
    

    查看源码,我们可以知道,observeOn也生产了一个新的包装流SingleObserveOn,变换,如下:

    private static void asyncChangeRx() {
        SingleSource<String> producer = new SingleFromCallable<>(() -> {
                printThread("生产数据");
                return "s";
            });
        SingleObserver<String> consumer = new ConsumerSingleObserver<>((s) -> {
            printThread("消费数据");
        }, Functions.ERROR_CONSUMER);
        
        new SingleObserveOn<>(producer, Schedulers.computation())
        .subscribe(consumer);
    }
    

    这样也就是具有线程变换功能的SingleObserveOn,包裹起了原始的生产者SingleFromCallable,其他不变,因此我们先认为是单一线程模型可以大概推出:

    consumer订阅  --->
    SingleObserveOn.subscribeActual()   --->
    SingleFromCallable.subscribeActual()
    

    所以真正线程变换就在SingleObserveOn.subscribeActual()里面,实现如下:

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
      source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
    }
    
    
    

    这里SingleObserver<? super T> s就是我们自己的 consumer, 而source就是我们的 producer,一定要注意,上面这个订阅过程会导致上游产生数据,因此将触发ObserveOnSingleObserver.onSuccess(T t),我们再看具体实现代码,如下:

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
    
        ...
        final SingleObserver<? super T> actual;//我们实际的订阅者
        final Scheduler scheduler;//我们设定的线程执行类
    
        T value;//保存上游的数据
        Throwable error;//保存上游的错误
    
    
        @Override
        public void onSuccess(T value) {
            this.value = value;//拿到上游数据
            Disposable d = scheduler.scheduleDirect(this);//要求线程执行自己
            DisposableHelper.replace(this, d);
        }
    
        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }
    
        @Override
        public void run() {//这里异步执行原有的调用流程
            Throwable ex = error;
            if (ex != null) {
                actual.onError(ex);
            } else {
                actual.onSuccess(value);
            }
        }
        
        ...
    }
    

    生产数据后,ObserveOnSingleObserver本身继承了Runnable,将同步调用的流程封装在了run()方法里面,再叫scheduler去执行自己,完成了线程切换。

    线程切换最简单的整个流程,就是以上调用,如果加上生产者也要切换线程,也是一样的,它有个对象,SingleSubscribeOn来包装流,包装过程伪代码如下:

    从调用链最后开始,往上包装:
    SingleObserveOn  --包含-->
    SingleSubscribeOn --包含-->
    producer
    
    订阅过程,又最后往上 被包装:
    consumer  --被包含-->
    ObserveOnSingleObserver--被包含-->  负责消费者切换
    SubscribeOnObserver   负责生产者切换
    
    

    SubscribeOnObserver 里面也很简单,也是run方法,如下:

    @Override
    public void run() {
      source.subscribe(this);//订阅就是生产数据
    }
    

    线程变换总结:
    1.订阅将会触发生产
    2.将上游的订阅过程,封装到runnable,再交由scheduler去执行
    3.将下游的消费过程,封装到runnable,再交由scheduler去执行

    其他思考:
    多次线程变换,生产数据会在哪次里面?,而消费过程会在哪次里面?
    示例代码:

    public static void main(String[] args) {
        asyncChangeRx2();
        loopMain();
    }
    
        
    private static void asyncChangeRx2() {
        Single.fromCallable(() -> {
                printThread("生产数据");
                return "s";
            })
            .subscribeOn(myScheduler("生产包装线程1"))
            .subscribeOn(myScheduler("生产包装线程2"))
            .observeOn(myScheduler("消费包装线程1"))
            .observeOn(myScheduler("消费包装线程2"))
            .subscribe((s) -> {
                printThread("消费数据");
                unloopMain();
            });
        
    }
    
    private static Scheduler myScheduler(String name) {
        return new Scheduler() {
    
            @Override
            public Worker createWorker() {
                // TODO Auto-generated method stub
                return new NewThreadWorker(new ThreadFactory() {
    
                    @Override
                    public Thread newThread(Runnable r) {
                        // TODO Auto-generated method stub
    
                        return new Thread(new HookRun(r), name);
                    }
                });
            }
        };
    }
    
    //简单打印活动线程
    private static class HookRun implements Runnable {
    
        private Runnable mRealRun;
    
        public HookRun(Runnable r) {
            // TODO Auto-generated constructor stub
            mRealRun = r;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            System.out.println(String.format("执行Run[%s]", Thread.currentThread().getName()));
            mRealRun.run();
        }
    
    }
    
    
    private static void printThread(String msg) {
        System.out.println(String.format("[%s][%s]", msg, Thread.currentThread().getName()));
    }
    
    private static void loopMain() {
        do {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if (needBreak)
                break;
        } while (true);
        System.out.println("END");
    }
    
    private static void unloopMain() {
        needBreak = true;
    }
    
    private static boolean needBreak;
    }
    
    输出:
    执行Run[生产包装线程2]
    执行Run[生产包装线程1]
    [生产数据][生产包装线程1]
    执行Run[消费包装线程1]
    执行Run[消费包装线程2]
    [消费数据][消费包装线程2]
    END
    

    说明生产最终在第一次包装里面,消费在最后一次包装里面,符合我们刚才分析的包装过程伪代码的方向。离活动(生产或者消费)最近的一次线程切换包装负责执行

    相关文章

      网友评论

        本文标题:RxJava 2.0 线程切换简单分析

        本文链接:https://www.haomeiwen.com/subject/nyzxgttx.html