美文网首页
【Android】【框架】【RxJava】

【Android】【框架】【RxJava】

作者: 徐乙_ | 来源:发表于2019-08-18 15:13 被阅读0次

    本文探索2个方向

    1. 发布/订阅机制的实现架构
    2. 如何实现的一行代码切换线程

    此外对于操作符的使用,我建议结合其他库一起学习,比如Retrofit+RxJava使用flatMap解决嵌套请求,比如结合Room、LifeCycle,比如RxBus

    发布/订阅

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {}
    
                @Override
                public void onNext(Integer value) {}
    
                @Override
                public void onError(Throwable e) {}
    
                @Override
                public void onComplete() {}
            });
    

    当调用了Observable.create后,目前的Observable结构是这样的

    image.png

    可以把它理解为发布者,它接受订阅者的参数
    当我们调用了subscribe方法后,他会执行获取数据的逻辑,然后传递数据给订阅者

    image.png

    总结来说就是装饰者模式+观察者模式的灵活使用,每一级都返回Observable,并对之前的callback进行保存或执行

    线程切换

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).observeOn(Schedulers.io())
              .subscribeOn(AndroidSchedulers.mainThread())
              .subscribe(new Observer<Integer>() {
                  @Override
                  public void onSubscribe(Disposable d) {}
    
                  @Override
                  public void onNext(Integer value) {}
    
                  @Override
                  public void onError(Throwable e) {}
    
                  @Override
                  public void onComplete() {}
              });
    

    observeOn后,现在Observable的封装如下

    image.png

    subscribeOn同理,封装后如下

    image.png

    ObservableObserveOn是Observable,他对于subscribe有自己的实现
    它的实现就是根据具体传入的Scheduler在对应的线程执行逻辑
    可以参考如下代码

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker(); // 创建对应的Worker
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    举个例子,newThread的对应实现

    @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    可以看到内置了一个线程池,在线程池里执行实际逻辑

    总结

    RxJava的函数式编程核心就是通过装饰者+观察者实现的,十分巧妙
    这一点很像JS的Promise

    后记

    有什么写得错误、让人费解或遗漏的地方,希望可以不吝赐教,我会马上更改

    学习自

    https://www.jianshu.com/p/88aacbed8aa5
    http://ju.outofmemory.cn/entry/358094

    相关文章

      网友评论

          本文标题:【Android】【框架】【RxJava】

          本文链接:https://www.haomeiwen.com/subject/xibrsctx.html