美文网首页
RxJava入门解析(二)

RxJava入门解析(二)

作者: tiloylc | 来源:发表于2018-03-22 19:00 被阅读0次

    续上一篇文章

    一、RxJava的变换。

    引用别人的一句话,变换就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。那么何为加工,下面开始以API举例

    1、map()

     Observable.just("abc")
                    .map(new Func1() {
                        @Override
                        public Object call(Object o) {
                            return o.toString()+"def";
                        }
                    }).subscribe(new Subscriber() {
                        @Override
                        public void onCompleted() {
            
                        }
            
                        @Override
                        public void onError(Throwable e) {
            
                        }
            
                        @Override
                        public void onNext(Object o) {
                            System.out.println(o.toString());
                        }
                    });
    
    

    由上方代码可以看出,在map方法里将传入的字符串进行了变化,这里出现了一个新的类叫Func1。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
    另外可以看出经过map变化后,数据直接传入了onNext中进行使用,另外还可以将数据类型进行转变。这就由读者自行修改了。

    2、flatMap()

    这是一个很好用的方法,类似于map()方法对要发射的数据进行转换,但是与map()不同的是flatMap()方法并不是直接作用于SubScriber中,而是返回一个Observable的对象,然后由这个对象来执行:

     Observable.just("abc","bcd","cde")
                    .flatMap(new Func1() {
                        @Override
                        public Object call(Object o) {
                            return Observable.just(o.toString()+"---");
                        }
                    }).subscribe(new Subscriber() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Object o) {
                            System.out.println(o.toString());
                        }
                    });
    

    map和flatmap的区别有点像just和from,一个是对数据直接传入,一个是对数组的数据进行分割处理。

    /**
         * Returns an Observable that emits items based on applying a function that you supply to each item emitted
         * by the source Observable, where that function returns an Observable, and then merging those resulting
         * Observables and emitting the results of this merger.
    */
    

    这是源码中官方的解释。对Observable发射的数据都应用一个函数,这个函数返回一个Observable,然后合并这些Observables,并且发送合并的结果。 flatMap和map操作符很相像,flatMap发送的是合并后的Observables,map操作符发送的是应用函数后返回的结果集。

    3、lift()

    在RxJava中,有各种的变换方式,但是原理都是基于lift方法的原理来做的,什么是lift方法,先看一下源码:

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    //1
            return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
      } 
    
    
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
            // cover for generics insanity
        }
    
    
    
    public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
        final OnSubscribe<T> parent;
    
        final Operator<? extends R, ? super T> operator;
    
        public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
            this.parent = parent;
            this.operator = operator;
        }
    
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    parent.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    
    

    看一下源码可以得出几个结论
    1、lift变换是new了一个新的被观察者来继续之前的操作
    2、Operator对象可以视为一个带返回值的Subscriber对象
    3、在new新的Observable对象时,创建了一个新的OnSubscribeLift对象作为OnSubscribe传入
    4、新的OnSubscribeLift使用的OnSubscribe为原始的OnSubscribe,Operator为新的Subscriber
    5、Operator的构造方法中已经传入了原始的Subscriber,可以直接使用
    6、在subscribe方法调用时,其实是进入到了新的OnSubscribeLift的call方法中,然后由原始的onSubscribe对象调用新的Subscriber对象。举个例子:

         Observable.create(new Observable.OnSubscribe<Object>() {
                        @Override
                        public void call(Subscriber<? super Object> subscriber) {
    
                            System.out.println("====OnSubscribe===call=");
                            subscriber.onNext("123");
                            subscriber.onCompleted();
                        }
                    })
                    .lift(new Observable.Operator<Object, Object>() {
    
                        @Override
                        public Subscriber<? super Object> call(Subscriber<? super Object> subscriber) {
    
                            System.out.println("====lift===call=");
                            return new Subscriber<Object>() {
                                @Override
                                public void onCompleted() {
    
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onNext(Object o) {
                                    System.out.println("====lift===onNext=");
                                    subscriber.onNext(Integer.decode(o.toString()));
                                }
    
                                @Override
                                public void onStart() {
                                    super.onStart();
                                    System.out.println("====lift===onStart=");
                                }
                            };
                        }
                    }).subscribe(new Subscriber() {
                        @Override
                        public void onCompleted() {
                            subscriber.onCompleted();
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            subscriber.onError(e);
                        }
    
                        @Override
                        public void onNext(Object o) {
                            System.out.println("====Subscriber===onNext=");
                            System.out.println(o.toString());
                        }
                        @Override
                        public void onStart() {
                            super.onStart();
                            System.out.println("====Subscriber===onStart=");
                        }
                    });
    

    打印结果为

    ====Subscriber===onStart=
    ====lift===call=
    ====lift===onStart=
    ====OnSubscribe===call=
    ====lift===onNext=
    ====Subscriber===onNext=
    123
    

    先由subscribe方法中调用原始的Subscriber的start方法,然后调用到lift的call方法,在call方法中调用了新的Subscriber的start方法,然后原始的OnSubscribe(parent)调用了call方法,传入了新的Subscriber,然后顺序调用onNext方法。
    我们可以看一下map()方法的实现:

      public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return create(new OnSubscribeMap<T, R>(this, func));
        }
    
     
    public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    
        final Observable<T> source;
    
        final Func1<? super T, ? extends R> transformer;
    
        public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
            this.source = source;
            this.transformer = transformer;
        }
    
        @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            source.unsafeSubscribe(parent);
        }
    
        static final class MapSubscriber<T, R> extends Subscriber<T> {
    
            final Subscriber<? super R> actual;
    
            final Func1<? super T, ? extends R> mapper;
    
            boolean done;
    
            public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
                this.actual = actual;
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                R result;
    
                try {
                    result = mapper.call(t);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    unsubscribe();
                    onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                    return;
                }
    
                actual.onNext(result);
            }
    
            @Override
            public void onError(Throwable e) {
                if (done) {
                    RxJavaHooks.onError(e);
                    return;
                }
                done = true;
    
                actual.onError(e);
            }
    
    
            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                actual.onCompleted();
            }
    
            @Override
            public void setProducer(Producer p) {
                actual.setProducer(p);
            }
        }
    
    }
    

    逻辑基本是一样的,就不多做赘述。

    Subject

    RxJava中常见的Subject有4种,分别是AsyncSubject、 BehaviorSubject、 PublishSubject、 ReplaySubject
    Subject既可以做观察者也可以做被观察者。
    1、AsyncSubject :AsyncSubject无论输入多少参数,永远只输出最后一个参数。
    2、BehaviorSubject:会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。
    3、PublishSubject:发送订阅起到Completed之间所有的值。
    4、ReplaySubject:无论何时订阅,都会将所有历史订阅内容全部发出。
    使用方法:

    //        PublishSubject sb = PublishSubject.create();
            BehaviorSubject sb = BehaviorSubject.create();//配置默认值为BehaviorSubject.create("defaultData");
    //        AsyncSubject sb = AsyncSubject .create();
    //        ReplaySubject sb = ReplaySubject.create();
            sb.subscribe(
                    new Action1<Object>() {
                        @Override
                            public void call(Object o) {
                            System.out.println(o.toString());
                        }
                    });
    
            sb.onNext(1);
            sb.onNext(2);
            sb.onNext(3); 
            sb.onCompleted();
    

    相关文章

      网友评论

          本文标题:RxJava入门解析(二)

          本文链接:https://www.haomeiwen.com/subject/yduaqftx.html