RxAndroid 2.0 学习笔记

作者: 初见破晓 | 来源:发表于2016-12-24 14:26 被阅读7066次

    Rxjava 2.x正式版出来已经快两个月了。在之前的项目中也在使用Rx。但却一直没有时间对整个的知识进行梳理,恰好今天抽出时间,也系统的再学习一遍RxJava/RxAndroid


    RxJava的使用

    一、观察者/被观察者

    1、前奏:
    在观察者之前就要先提下backpressure这个概念。简单来说,backpressure是在异步场景中,被观察者发送事件速度远快于观察者的处理速度时,告诉被观察者降低发送速度的策略。

    2、在2.0中有以下几种观察者

    • Observable/Observer
    • Flowable/Subscriber
    • Single/SingleObserver
    • Completable/CompletableObserver
    • Maybe/MaybeObserver

    依次的来看一下:

    Observable

    Observable
    .just(1, 2, 3)
    .subscribe(new Observer < Integer > () {
    @Override public void onSubscribe(Disposable d) {}
    @Override public void onNext(Integer value) {}
    @Override public void onError(Throwable e) {}
    @Override public void onComplete() {}
    });
    
    

    这里要提的就是onSubscribe(Disposable d),disposable用于取消订阅。
    就用简单的just这个操作符来分析一下。

    @SuppressWarnings("unchecked")
    @SchedulerSupport(SchedulerSupport.NONE) 
    public static < T > Observable < T > just(T item1, T item2, T item3, T item4) {
        ObjectHelper.requireNonNull(item1, "The first item is null");
        ObjectHelper.requireNonNull(item2, "The second item is null");
        ObjectHelper.requireNonNull(item3, "The third item is null");
        ObjectHelper.requireNonNull(item4, "The fourth item is null");
    
        return fromArray(item1, item2, item3, item4);
    }
    
    @SchedulerSupport(SchedulerSupport.NONE) 
    public static < T > Observable < T > fromArray(T...items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items));
    }
    
    @Override 
    public void subscribeActual(Observer < ?super T > s) {
        FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array);
        s.onSubscribe(d);
        if (d.fusionMode) {
            return;
        }
        d.run();
    }
    
    @Override 
    public void dispose() {
        disposed = true;
    }
    
    @Override 
    public boolean isDisposed() {
        return disposed;
    }
    
    void run() {
        T[] a = array;
        int n = a.length;
        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }
    

    just实际调用了fromArray方法,中创建了ObservableFromArray的实例,在这个实例中实现了Observable这个接口,在调用subscribe方法进行绑定之后,首先调用了subscribeActual方法,onSubscribe就会回调。

    在取消绑定是我们可以将Disposable添加到CompositeDisposable中或者直接调用Disposable的dispose() 方法在流的任意位置取消。

    此外, 为了简化代码,我使用了Consumer作为观察者(可以当成1.0时候的Action1 、ActionX)subscribe的返回值就是一个Disposable (subscribe 的返回值根据传入的参数不同,也有不同)我把这个对象添加到CompositeDisposable,并在中途取消,但发射器仍然会把所有的数据全都发射完。因为LambdaSubscriber(也就是传入Consumer 所构造的观察者)的disposeisDispose 略有不同,并不是简简单单的true/false, 说实话,我没看懂Consumer的这两个方法干了什么...........尴尬

    LambdaSubscriber 瞅瞅
    
    @Override
    public void dispose() { 
    cancel();
    }
    
    @Override
    public boolean isDisposed() {  
      return get() == SubscriptionHelper.CANCELLED;
    }
    

    Flowable

    是2.0之后用的最多的观察者了,他与上一个的区别在于支持背压,也就是说,下游会知道上游有多少数据,所以他Subscriber会是这样

    Flowable
    .just(1, 2, 3, 4)
    .subscribe(new Subscriber < Integer > () {
    @Override public void onSubscribe(Subscription s) {
      s.request(Long.MAX_VALUE);
    }
    @Override public void onNext(Integer integer) {}
    @Override public void onError(Throwable t) {}
    @Override public void onComplete() {}
    });
    

    onSubscribe 这个回调传出了一个Subscription, 我们要指定他传出数据的大小, 调用他的request() 方法。如没有要求可以传入一个Long的最大数值Long.MAX_VALUE
    要说明一下,request这个方法若不调用,下游的onNext与OnComplete都不会调用;若你写的数量小于,只会传你的个数,但是不会调用onComplete方法,可以看下FlowableFromArrayslowPath方法

    @Override void slowPath(long r) {
        long e = 0;
        T[] arr = array;
        int f = arr.length;
        int i = index;
        Subscriber < ?super T > a = actual;
        for (;;) {
            while (e != r && i != f) {
                if (cancelled) {
                    return;
                }
                T t = arr[i];
                if (t == null) {
                    a.onError(new NullPointerException("array element is null"));
                    return;
                } else {
                    a.onNext(t);
                }
                e++;
                i++;
            }
            if (i == f) {
                if (!cancelled) {
                    a.onComplete();
                }
                return;
            }
            r = get();
            if (e == r) {
                index = i;
                r = addAndGet( - e);
                if (r == 0L) {
                    return;
                }
                e = 0L;
            }
        }
    }
    }
    

    需要if (i == f) f 是这个数据的大小,i是当前发送数据的个数,所以不会调用onComplete

    休息一下

    这是几种被观察者实现的接口

    • Observable 接口 ObservableSource
    • Flowable 接口 Publisher
    • Single 接口 SingleSource
    • Completable 接口 CompletableSource
    • Maybe 接口 MaybeSource

    梳理完了两个被观察者,发现Flowable支持背压,父类是Publisher;Observable不支持背压,父类是ObservableSource,他们的实现方式,与其的操作符,到最后的观察者,都有些不同,他们是完全独立开的。各自之间也互不影响。

    Single

    单值相应的模式: 就是只有一个值呗?

    Completable

    表示没有任何值但仅指示完成或异常的延迟计算。

    Maybe

    maybe 可以当成上面两个的合体吧!

    后面的三种,就不细掰了,我就是这么不求甚解。

    二、操作符

    操作符基本就没有改变,但还是会发现,我擦,from没了,可以使用fromIterable
    之前的actionx 也替换了Action \ Consumer \ BiConsumer
    Func也跟action一样, 名字改变了Function

    感觉这样是正经Rx了。

    三、线程切换

    当然现场切换没有发生改变,用法还是一样,但是之前没有看过源码,不知道怎样神奇的把线程切换了,难道是来自东方的神秘力量。趁着还有时间,撸一下代码。
    在调用subscribeOn(Schedulers.io())之后,会创建ObservableSubscribeOn

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
    @Override
    public void run() {
            source.subscribe(parent);
        }
    }
    ));
    

    在这个过程中,会把source也就是ObservableSource在线程中订阅,同时也把把传入的Observer变成SubscribeOnObserver。若指定的是io线程,可以在IoScheduler中看见对线程的管理
    在调用observeOn(AndroidSchedulers.mainThread())时,会产生一个ObservableObserveOn,同时还会把Observer变成ObserveOnObserver,可以发现在HandlerScheduler,在ui线程调用了ObserveOnObserver的run方法

    四、Rxjava的数据传递

    Rxjava是我在工作这几个月最喜欢的框架,没有之一。完全解决了我这个有洁癖的人在打代码时的玻璃心。
    虽然重复造轮轮子是没有必要的(我也造不出来),但是为了全面的了解Rxjava,我也准备简单的实现一下,数据在每个操作符之中传输的整个过程。

    在实现之前先猜想一下大概的过程吧:
    我的需求是在一个static方法中产生一个数值,并且通过一层层的接口传递下去,下面的操作符的人参是上一个的返回值,最后输出,我就模仿着Rx的 Maybe 的名字实现吧。

    • 首先我要一直‘点’下去的话Maybe 一定要返回自己
    • 接口要一层层的传进去,这样的话就可以在发射数据时,发原始数据传入这个一堆的接口,然后每个接口计算自己的实现。
    • 最后返回结果

    之后就是仿造源码完成这段需求了,当然这些方法也都简单写了,就是为了弄清楚思路:

    1、创建一个MaybeSource,我们的Maybe 和 各个操作符都会实现它。

    public interface MaybeSource {
         void subscribe(MaybeObserver observer);
    }
    

    2、创建一个MaybeObserver, 这就是最后绑定的时候的接口

    public interface MaybeObserver {
        void onSuccess(int value);
    }
    

    3、创建Function, 这个在操作符中用于实现

    public interface Function {
        int apply(int t);
    }
    

    4、当然少不了Maybe, 这里就实现just和map两个方法吧

    public abstract class Maybe implements MaybeSource {
        public static Maybe just(int item) {
            return new MaybeJust(item);
        }
    
        public final Maybe map(Function mapper) {
            return new MaybeMap(this, mapper);
        }
    }
    

    5、just实际返回的对象是MaybeJust,他的父类是Maybe

    public class MaybeJust extends Maybe {
        final int value;
    
        public MaybeJust(int value) {
            this.value = value;
        }
    
        @Override
        public void subscribe(MaybeObserver observer) {
            observer.onSuccess(value);
        }
    }
    

    6、map实际返回的对象是MaybeMap,他的父类是Maybe

    public class MaybeMap extends Maybe {
        final Function mapper;
        final MaybeSource source;
    
        public MaybeMap(MaybeSource source, Function mapper) {
            this.source = source;
            this.mapper = mapper;
        }
    
        @Override
        public void subscribe(MaybeObserver observer) {
            source.subscribe(new MapMaybeObserver(observer, mapper));
        }
    
        static final class MapMaybeObserver implements MaybeObserver {
            final MaybeObserver actual;
    
            final Function mapper;
    
            MapMaybeObserver(MaybeObserver actual, Function mapper) {
                this.actual = actual;
                this.mapper = mapper;
            }
    
            @Override
            public void onSuccess(int value) {
                this.actual.onSuccess(this.mapper.apply(value));
            }
        }
    }
    

    7、在main中可以这么运行

    Maybe
    .just(1)
    .map(new Function() {
    
        @Override
        public int apply(int t) {
            return t + 1;
        }
    }).map(new Function() {
    
        @Override
        public int apply(int t) {
            return t * 4;
        }
    }).subscribe(new MaybeObserver() {
    
        @Override
        public void onSuccess(int value) {
            System.out.println(value);
        }
    });
    

    8、运行结果,传入1,先+1, 在 * 4,最后结果应该是8

    Paste_Image.png

    得到了期望的结果


    RxJava 2.0 + Retrofit 2 .0

    之前做过一个项目,没用什么架构,也没什么封装。但对我帮助最大的是,之前是不能接受这样的代码的,感觉看上去脑袋都大了。但看习惯了, 也就习惯了。
    但平时自己弄个小项目还是使用mvp,自己的洁癖可能更加强烈一点

    在Retrofit 中选择了Flowable作为返回值,支持背压,在2.0之后应该最为常用

    @GET("/")  
    Flowable<ResponseBody> getText();
    

    在RxJava 2.0 中使用CompositeDisposable做解除绑定的操作, Consumer 回调中使用了三个Consumer,作为成功、失败、完成的回调

        public <T> void addSubscription(Flowable flowable,
            final RxSubscriber<T> subscriber) {
            if (mCompositeDisposable == null) {
                mCompositeDisposable = new CompositeDisposable();
            }
    
            if (subscriber == null) {
                Log.e(TAG, "rx callback is null");
    
                return;
            }
    
            Disposable disposable = flowable.subscribeOn(Schedulers.io())
                                            .observeOn(AndroidSchedulers.mainThread())
                                            .subscribe(new Consumer<T>() {
                        @Override
                        public void accept(T o) throws Exception {
                            subscriber.onNext(o);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable)
                            throws Exception {
                            subscriber.onError(throwable);
                        }
                    },
                    new Action() {
                        @Override
                        public void run() throws Exception {
                            subscriber.onComplete();
                        }
                    });
    

    此外,之前的项目后台接口也是奇葩,同一个人写的接口,接口的返回格式更是多种多样,还不改,没办法,客户端只能将就着服务端,谁叫我们是新来的呢。遇到这种问题,就不直接转成对象格式了,先转成ResponseBody得到Body,再拿出string来。
    okhttp中response的body对象就是这个ResponseBody,他的string() 方法就可以获得整个body,然后再做json解析吧

    Paste_Image.png Paste_Image.png

    相关文章

      网友评论

      • Wing_Li:最后这个例子,简直太棒了。

      本文标题:RxAndroid 2.0 学习笔记

      本文链接:https://www.haomeiwen.com/subject/dzmlvttx.html