呕心沥血:RxJava2.x基础流程分析

作者: Burning燃烧 | 来源:发表于2019-11-27 22:21 被阅读0次

    RxJava

    一、RxJava的优势以及基本使用

    1、优势

    1、代码逻辑清晰 优雅

    2、避免回调订阅

    3、线程调度

    2、使用示例

        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("RxText");
            }
        });
        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
    
            }
    
            @Override
            public void onNext(Object o) {
                LogUtils.showLog("message == "+(String)o);
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onComplete() {
    
            }
        };
        observable.subscribe(observer);
    

    上面最基础的RxJava使用流程,创建被观察者Observable并在subscribe方法中进行处理->创建被观察者Observer实现回调方法->通过subscribe方法将被观察者和观察者进行订阅关联。

    二、RxJava基础原理分析

    1、观察者模式浅析

    1.1、观察者模式的4个角色

    (1) 抽象被观察者角色

    (2) 抽象观察者角色

    (3) 具体被观察者角色

    (4) 具体观察者角色

    1.2、利用观察者模式的的4个角色构建示例

    场景:微信服务号发送服务消息,普通用户订阅公众号后接受消息;

    (1)定义抽象的被观察者Observable,构建register、remove、notify几个通用接口

    public interface Observable {
        void registerObserver(Observer o);
    
        void removeObserver(Observer o);
    
        void notifyObserver();
    }
    

    (2)定义抽象的观察者Observer,构建update(String message)用来更新消息的通用接口

    public interface Observer {
        void update(String message);
    }
    

    (3)定义具体的被观察者WechatServer实现Observable;在具体的被观察者中会创建一个List用来保存或者删除订阅的用户(观察者)以及逐一通知用户(观察者)消息更新;最后实现一个具体的发送消息方法共外层调用。

    public class WechatServer implements Observable {
        private List<Observer> observerList;
        private String message;
    
        public WechatServer() {
            observerList = new ArrayList<>();
        }
    
        @Override
        public void registerObserver(Observer o) {
            observerList.add(o);
        }
    
        @Override
        public void removeObserver(Observer o) {
            if (!observerList.isEmpty())
                observerList.remove(o);
        }
    
        @Override
        public void notifyObserver() {
            for (Observer observer : observerList) {
                observer.update(message);
            }
        }
    
        /**
         * 外层调用 用于发送消息
         * @param msg
         */
        public void sendMessage(String msg) {
            this.message = msg;
            System.out.println("微信服务发送一条消息:" + msg);
            notifyObserver();
        }
    }
    

    (4)定义具体的观察者Person并实现Observer接口,构造方法中传入人员名称;在update方法中接收微信服务发送的消息

    public class Person implements Observer {
        private String userName;
        private String message;
    
        public Person(String name) {
            this.userName = name;
        }
    
        @Override
        public void update(String message) {
            this.message = message;
            System.out.println(userName + "收到了一条消息:" + message);
        }
    
    

    最后再main函数的调用以及输出:

    public class MainTest {
        public static void main(String[] args) {
            WechatServer wechatServer = new WechatServer();
            Person personZhangSan = new Person("张三");
            Person personLs = new Person("李四");
            Person personWangwu = new Person("王五");
            wechatServer.registerObserver(personZhangSan);
            wechatServer.registerObserver(personLs);
            wechatServer.registerObserver(personWangwu);
            wechatServer.sendMessage("肉又涨价了");
    
            System.out.println("=============================================");
    
            wechatServer.removeObserver(personZhangSan);
            wechatServer.sendMessage("股票又赔了");
        }
    }
    

    输出结果如下:

    微信服务发送一条消息:肉又涨价了
    张三收到了一条消息:肉又涨价了
    李四收到了一条消息:肉又涨价了
    王五收到了一条消息:肉又涨价了
    =============================================
    微信服务发送一条消息:股票又赔了
    李四收到了一条消息:股票又赔了
    王五收到了一条消息:股票又赔了
    

    2、RxJava Observable创建订阅流程(未加线程切换操作)

    2.1、Observable.create流程

    Observable observable = Observable.create(new ObservableOnSubscribe() {
                @Override
                public void subscribe(ObservableEmitter e) throws Exception {
                    if (!e.isDisposed()) {
                        e.onNext("RxText");
                    }
                }
            });
    

    源码分析:

    @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            //RxJavaPlugins.onAssembly见分析1 实际返回的还是参数中的ObservableCreate对象
            //new ObservableCreate见分析2
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
        
        ==>分析1
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            //实际上RxJavaPlugins.onAssembly就是做了一个hook操作(但我们默认平时用RxJava是没有hook操作的)
            //所以这个方法直接返回参数中的source对象
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
        
        ==>分析2
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            //ObservableCreate的构造方法就是在当前类中保存了ObservableOnSubscribe对象
            //ObservableOnSubscribe是一个接口,内部只有一个subscribe方法
            this.source = source;
        }
    

    Observable.create方法实际上就是创建ObservableCreate对象并返回;

    我们刚才分析了观察者模式,这里的Observable其实就是一个抽象被观察者角色,而ObservableCreate就是具体的被观察者对象

    2.2、observable.subscribe(observer)订阅分析

    点击Observable.subscribe源码中可以看到其中调用了subscribeActual(observer)方法;而Observable.create创建的是ObservableCreate这个具体的被观察者对象,因此去ObservableCreate中看subscribeActual(observer)的具体实现:

    @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //创建CreateEmitter发射器
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //调用observer中的onSubscribe,说明此时被观察者和观察者关联上了
            observer.onSubscribe(parent);
    
            try {
                //这个source就是ObservableCreate构造中传的ObservableOnSubscribe对象
                //此时我们上层代码中的subscribe中的业务逻辑被执行
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                //如果发生异常 通过发射器调用onError方法
                parent.onError(ex);
            }
        }
    

    可以看到subscribeActual中主要做的就是:创建CreateEmitter发射器(有点类似于我们上面观察者模式示例中用来保存Observer的List,通过CreateEmitter可以更加的解耦实现更多的操作)->执行onSubscribe关联方法->执行subscribe中我们自己写的业务逻辑->有异常执行onError方法。

    当我们在subscribe中调用e.onNext时,CreateEmitter中的onNext会被执行,在CreateEmitter的onNext中又会执行Observer的onNext;至此我们Observer中的onNext方法被执行。(CreateEmitter实现了ObserverableEmitter)CreateEmitter在ObservableCreate中

    3、“冷、热”Observable是个什么鬼?

    (1)“冷”Observable

    只有当观察者订阅了才会开始执行发射数据流的代码;Observable和Observer是一对一的关系;有多个Observer的时候,他们各自的事件是各自独立的。(即使订阅时间不同,每一个observer都是从头开始接收)

    private void coldAndHotObservable() {
            Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(final ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(1, TimeUnit.SECONDS
                            , Schedulers.computation())
                            .take(Integer.MAX_VALUE)
                            .subscribe(new Observer<Long>() {
                                @Override
                                public void onSubscribe(Disposable d) {
    
                                }
    
                                @Override
                                public void onNext(Long aLong) {
                                    e.onNext(aLong);
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
    
                                }
                            });
                }
            }).observeOn(Schedulers.newThread());//.publish();
    
    //        ((ConnectableObservable) observable).connect();
    
            observable.subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.showLog("aLong1 : " + aLong);
                }
            });
            observable.subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.showLog("  aLong2 : " + aLong);
                }
            });
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            observable.subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.showLog("      aLong3 : " + aLong);
                }
            });
        }
    
    冷热Observable1.png

    (2)“热”Observable

    无论被观察与观察者的订阅与否,事件始终都会发生(RxBus等原理类似);多个观察者进行订阅的时候信息是共享的(订阅时间不同,每个observer接收的数据一致)

    private void coldAndHotObservable() {
            Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(final ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(1, TimeUnit.SECONDS
                            , Schedulers.computation())
                            .take(Integer.MAX_VALUE)
                            .subscribe(new Observer<Long>() {
                                @Override
                                public void onSubscribe(Disposable d) {
    
                                }
    
                                @Override
                                public void onNext(Long aLong) {
                                    e.onNext(aLong);
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
    
                                }
                            });
                }
            }).observeOn(Schedulers.newThread()).publish();
    
            ((ConnectableObservable) observable).connect();
            
            //通过publish和connect将冷Observable转换成为热Observable
    
            observable.subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.showLog("aLong1 : " + aLong);
                }
            });
            observable.subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.showLog("  aLong2 : " + aLong);
                }
            });
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            observable.subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    LogUtils.showLog("      aLong3 : " + aLong);
                }
            });
        }
    
    冷热Observable2.png

    上面的LOG可以看出,被延迟订阅2s后的观察者并不是从头开始接收的信息,每个观察者所接受的数据一致。

    4、RxJava几种抽象被观察者对比

    几种Observable.png

    三、RxJava线程调度分析

    1、加入线程切换的简单使用

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    if (!e.isDisposed()){
                        e.onNext("Test");
                    }
                }
            })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
                            LogUtils.showLog("s == "+s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    2、subscribeOn方法分析

     @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            //这里跟之前创建ObservableCreate直接返回一个ObservableSubscribeOn(也是一个被观察者)对象
            //第一个参数:this其实就是上一个Observable对象 即ObservableCreate
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
        
        //ObservableSubscribeOn的构造方法 保存了上一个Observable对象(这里就是ObservableCreate对象)见分析1
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
        
        
        ===>分析1
        //ObservableSubscribeOn继承了AbstractObservableWithUpstream类
        public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
        
        //AbstractObservableWithUpstream是一个抽象类其实也是一个Observable,只不过内部保存了上一个调用的Observable对象 即source
        abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    
        /** The source consumable Observable. */
        protected final ObservableSource<T> source;
    
        /**
         * Constructs the ObservableSource with the given consumable.
         * @param source the consumable Observable
         */
        AbstractObservableWithUpstream(ObservableSource<T> source) {
            this.source = source;
        }
    
        @Override
        public final ObservableSource<T> source() {
            return source;
        }
    

    subscribeOn就是创建并返回一个ObservableSubscribeOn被观察者对象;
    而ObservableSubscribeOn继承了AbstractObservableWithUpstream类并通过构造函数将上游的Observable传给AbstractObservableWithUpstream;AbstractObservableWithUpstream本质是一个抽象类将ObservableSubscribeOn包了一层,内部保存了上游的Observable对象;这种设计模式是装饰器模式

    几个被观察者之间类似于一个“洋葱”

    洋葱模型.png

    注意:此时ObservableSubscribeOn的订阅流程还未走到,目前也就是初始化并创建了一个ObservableSubscribeOn被观察者对象

    3、observerOn方法分析

    observerOn方法创建了新的观察者ObservableObserverOn,并保存了上游ObservableSubscribeOn

    @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    

    4、subscribe订阅分析(重点)

    上面分析的从上到下的各种操作符总结来说其实就是不断创建新的被观察者对象并保存其上游的被观察者;而线程切换以及实际的订阅源开始执行是在此时开始的。

    observeOn().subscribe,在操作符的最后是通过observeOn生成的被观察者进行订阅的,因此从ObservableObserveOn中的subscribeActual方法开始分析

    @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //创建了worker 分析1
                Scheduler.Worker w = scheduler.createWorker();
                
                //分析2
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
        
        ===>分析1 createWorker是拿到我们传入的AndroidSchedulers.mainThread后进行调用
        跟源码,但调用mainThread时实际创建的是HandlerScheduler
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        
        HandlerScheduler中的createWorker 初始化创建了HandlerWorker
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
        
        ===>分析2
        创建初始化了ObserveOnObserver观察者对象后,调用上游(ObservableSubscribeOn)的subscribe方法,并将
        ObserveOnObserver这个观察者传进去
        
        ObservableSubscribeOn中的订阅方法,subscribeActual
        
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            s.onSubscribe(parent);
            
            //分析3
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
        
            
            
            ===>分析3
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
            
            new SubscribeTask本质就是创建了一个Runnable对象;其中的run方法就是调用ObservableSubscribeOn的上游
            ObservableCreate的Subscribe方法(实际发生线程切换就是在这个runnable中)
            
            scheduler.scheduleDirect;这个scheduler就是我们调用subscribeOn传入的scheduler对象
            这里以subscribeOn(Schedulers.newThread())为例
            
            在Schedulers中,当调用newThread时会创建一个NewThreadTask
            NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
            static final class NewThreadTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        }
            然后再创建一个NewThreadScheduler
            static final class NewThreadHolder {
            static final Scheduler DEFAULT = new NewThreadScheduler();
        }
            此时创建成功了NewThreadScheduler;看一下Scheduler中的scheduleDirect方法:
            
            public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //创建worker,这个createWorker实际就是调用子类NewThreadScheduler中的createWorker方法  分析4
            final Worker w = createWorker();
    
            //runnable就是subscribeTask(本质是一个Runnable)
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //封装成一个Task对象 DisposeTask中的run方法调用了SubscribeTask中的run
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //通过worker进行调度  分析5
            w.schedule(task, delay, unit);
    
            return task;
        }
            
        ===>分析4     NewThreadScheduler中的createWorker方法
        @Override
        public Worker createWorker() {
            return new NewThreadWorker(threadFactory);
        }
        public NewThreadWorker(ThreadFactory threadFactory) {
            //创建了一个线程池
            executor = SchedulerPoolFactory.create(threadFactory);
        }
        
        ===>分析5 NewThreadScheduler中的schedule方法
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                //通过线程池执行我们的任务;任务被切换到子线程中执行
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
            
    

    至此将subscribeTask切至其他线程中调用,run方法里的订阅关系也就移到了其他线程中执行;

    @Override
            public void run() {
                source.subscribe(parent);
            }
    

    上面分析源码时提到了,这个source就是ObservableSubscribe的上游ObservableCreate对象,看一下ObservableCreate的订阅部分代码

    protected void subscribeActual(Observer<? super T> observer) {
            //创建发射器
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //调用observer的onSubscribe进行关联
            observer.onSubscribe(parent);
    
            try {
                //这个就是我们实际创建的任务,在这里被执行(这里加上线程切换之后已经被切换到子线程了)
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    注意:这里的observer是我们上面提到的ObserveOnObserver(ObservableObserveOn的内部类)

    因此这里通过CreateEmitter发射器的onNext等方法实际调用的是ObserveOnObserver的onNext

    ObserveOnObserver的onNext方法:

    @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    //把T放在队列中;这个队列后面会用到
                    queue.offer(t);
                }
                schedule();
            }
            void schedule() {
                if (getAndIncrement() == 0) {
                    //这里的worker是observeOn操作符后面参数Schedulers创建的;
                    //我们这里是HandlerScheduler里面的HandlerWorker
                    //这里的this是ObserveOnObserver本身;ObserveOnObserver除了是观察者以外还是一个Runnable
                    worker.schedule(this);
                }
            }
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
                //最后把ObserveOnObserver封装成一个Message通过Handler发送
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
                
                //分析1
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
            
            
            ===>分析1
            handler.sendMessageDelayed把message发送出去了;这里由于message.obtain被赋值了
            因此msg.callback不为空;上面将message发送至MessageQueue;然后通过主线程的looper.loop
            方法不断从MessageQueue中拿到Message并调用handler.dispatchMessage进行处理
            
            public void dispatchMessage(Message msg) {
            if (msg.callback != null) {
                //走这个判断
                handleCallback(msg);
            } else {
                if (mCallback != null) {
                    if (mCallback.handleMessage(msg)) {
                        return;
                    }
                }
                handleMessage(msg);
            }
        }
        
            private static void handleCallback(Message message) {
            //这里的callback.run就是执行ObserveOnObserver里面的run方法
            //分析2
            message.callback.run();
        }
        
            ===>分析2
            ObserveOnObserver的run方法
             @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
            void drainNormal() {
                int missed = 1;
    
                //拿到队列
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
                        boolean d = done;
                        T v;
    
                        try {
                            //从队列把我们发送的事件取出
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.dispose();
                            q.clear();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
                        //调用最终Observer的回调方法onNext
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
            
            
        
        
    

    至此被观察者是如何一层一层创建的(装饰器模式、洋葱模型),subscrib的流程(从下至上),如何切换到子线程的又是如何切换回主线程的就全部讲完了,最后附上一张图(注意:被观察者创建是由上至下,订阅是由下至上

    Observable事件流程.png

    5、多次调用subscribeOn和observeOn生效问题

    Rx的订阅流程从下至上的,因此最终的业务场景是在最上方也就是首次调用的subscribeOn所制定的线程中执行;observeOn每一次切换都会生效

    observeOn多次调用问题

    private void rxjavaTest(){
            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                }
            })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogTestUtils.showLog("currentThread111 == "+Thread.currentThread().getName());
                        }
                    })
                    .observeOn(Schedulers.newThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogTestUtils.showLog("currentThread222 == "+Thread.currentThread().getName());
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    

    结果为:

    2019-11-27 21:59:41.497 12313-12313/com.hzf.test D/hzfTag1127: currentThread111 == main
    2019-11-27 21:59:41.498 12313-12372/com.hzf.test D/hzfTag1127: currentThread222 == RxNewThreadScheduler-2
    

    证明每次observeOn的调用都会生效

    多次调用subscribeOn

    private void rxjavaTest(){
            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                }
            })
                    .subscribeOn(Schedulers.newThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogTestUtils.showLog("currentThread111 == "+Thread.currentThread().getName());
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogTestUtils.showLog("currentThread222 == "+Thread.currentThread().getName());
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    

    结果为:

    2019-11-27 22:09:03.989 12883-12911/? D/hzfTag1127: currentThread111 == RxNewThreadScheduler-1
    2019-11-27 22:09:03.990 12883-12911/? D/hzfTag1127: currentThread222 == RxNewThreadScheduler-1
    
    

    证明subscribeOn只有第一次调用时生效

    相关文章

      网友评论

        本文标题:呕心沥血:RxJava2.x基础流程分析

        本文链接:https://www.haomeiwen.com/subject/eieabctx.html