RXJava

作者: Tony__Ren | 来源:发表于2019-04-22 15:06 被阅读0次

    传统的观察者模式

    观察者模式.png
    类图.png

    RxJava 四个要素

    • 被观察者
    • 观察者
    • 订阅
    • 事件
      • 创建被观察者
        subscriber就是观察者
    创建被观察者.png
    创建被观察者.png
      • 创建观察者
    创建观察者.png 创建观察者.png
      • 订阅
    订阅.png

    核心

    核心.png

    操作符

    map

    map操作符.png
    image.png
    //调用lift方法,创建一个OperatorMap对象作为参数
       public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return lift(new OperatorMap<T, R>(func));
        }
    
    public final class OperatorMap<T, R> implements Operator<R, T> {
    
        final Func1<? super T, ? extends R> transformer;
    
        public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
    //创建新的观察者对象
            return new Subscriber<T>(o) {
    
                @Override
                public void onCompleted() {
                    o.onCompleted();
                }
    
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
    
                @Override
                public void onNext(T t) {
                    try {
    //使用传入的观察者调用onNext方法回调Func1的方法
                        o.onNext(transformer.call(t));
                    } catch (Throwable e) {
                        Exceptions.throwOrReport(e, this, t);
                    }
                }
    
            };
        }
    
    }
    //lift方法中创建新的被观察者对象,相当于代理,负责接收原始的被观察者的事件
     public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return new Observable<R>(new OnSubscribe<R>() {
                @Override
                public void call(Subscriber<? super R> o) {
                    try {
    //拿到map方法中创建的观察者对象
                        Subscriber<? super T> st = hook.onLift(operator).call(o);
                        try {
                            // new Subscriber created and being subscribed with so 'onStart' it
                            st.onStart();
                            onSubscribe.call(st);
                        } catch (Throwable e) {
                            // localized capture of errors rather than it skipping all operators 
                            // and ending up in the try/catch of the subscribe method which then
                            // prevents onErrorResumeNext and other similar approaches to error handling
                            Exceptions.throwIfFatal(e);
                            st.onError(e);
                        }
                    } catch (Throwable e) {
                        Exceptions.throwIfFatal(e);
                        // if the lift function failed all we can do is pass the error to the final Subscriber
                        // as we don't have the operator available to us
                        o.onError(e);
                    }
                }
            });
    

    总结:
    map操作符是通过创建一个Observable<R>响应实现的是Observable<T>中的观察者Subscriber<? super T>

    这里有点绕,hook.onLift(operator).call(o)这行代码是调用OperatorMap方法中的call方法传入新创建的Subscriber观察者,在新创建的观察者中的onNext犯法中执行的是原观察者的Func1中的call回调。

    flatMap

    这个其实就是把多个map串联起来统一处理

      public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
            if (getClass() == ScalarSynchronousObservable.class) {
                return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
            }
            return merge(map(func));
        }
    

    RxJava 线程调度

    Schedulers


    Schedulers.png 线程控制.png

    subscribeOn 被观察者处理观察者的call回调(发出事件)
    observeOn 观察者的回调处理

    subscribeOn.png

    总结:
    subscribeOn只能调用一次,因为subscribeOn作用域是全局的每次创建新的Observable,subscribeOn指定Observable的线程执行位置。Observable只有一个。

    observeOn指定是它之后的subscriber观察者回调线程执行位置。subscriber是多个。

    相关文章

      网友评论

          本文标题:RXJava

          本文链接:https://www.haomeiwen.com/subject/efoegqtx.html