RxJava入门(1)

作者: tmyzh | 来源:发表于2018-01-29 16:46 被阅读38次

    RxJava的很多介绍中的开篇会介绍这样一堆关系,Observable(被观察者),Observer(观察者),subscribe(订阅) observable.subscribe(observer)(建立关系)。 这样观察者就能收到被观察者的通知作出响应,实现了一个RxJava的流程。讲真,不太理解为什么要这么介绍这个流程,特别是按语法设计被观察者订阅观察者也不符合逻辑思维,还有一个直观的介绍方式是Observable(上游),subscribe(管道),Observer(下游)。其实可以理解为页面上某些操作(需要异步或者说在子线程中处理)发生,完成之后,通知到ui线程做一些响应。RxJava比之AsyncTask ,Handler优势在与,代码结构简单明了,在逻辑复杂的时候易于修改,那么下面我们介绍怎么实现RxJava.

    加入依赖
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    
    创建Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });
    

    另种创建Observable的两种方式效果与上面代码一样

    Observable observable = Observable.just(1,2,3);
    --------------------------------------------------------------------
    int[] words = {1, 2, 3};
    Observable observable = Observable.from(words);
    

    ObservableEmitter 发射器,定义需要发送的事件并且向观察者发送事件,可以发出三种类型的事件onNext(T value),onComplete()和onError(Throwable error)
    注意
    1.Observable可以发送无数个onNext,Observer可以接收无数个onNext。
    2.Observable发送一个onComplete/onError之后可以继续发送事件,但是Observer接收onComplete/onError事件之后将不再继续接收其他事件
    3.最为关键的是onComplete和onError必须唯一并且互斥

    创建Observer

    方式一

     Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    //开始采用subscribe连接
                    Log.d(TAG, "subscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "error");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            };
    

    方式二

    Subscriber<String> subscriber = new Subscriber<Integer>() {
    
                // 观察者接收事件前,默认最先调用复写 onSubscribe()
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "对Next事件作出响应" + value);
                }
    
                // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }
    
                // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            };
    

    两种方式的不同在于Subscriber增加了两个方法
    onStart() 在订阅时候并且在call()方法之前(即事件序列依照设定依次被触发之前)调用,所以可以做一些初始化操作

    <-- Observable.subscribe(Subscriber) 的内部实现 -->
    
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        // 步骤1中 观察者  subscriber抽象类复写的方法,用于初始化工作
        onSubscribe.call(subscriber);
        // 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
        // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
        // 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
    }
    

    unsubscribe()用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件。调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露

    建立连接

    observable.subscribe(observer);
    

    运行结果
    subcrible
    1
    2
    3
    complete

    Observer中onSubscribe方法中的参数Disposable ,可以看成Observable与Observer之间的一个开关,当调用dispose()
    方法后,Observable可以继续发送事件,Observer会接收不到事件。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emit 1");
                    emitter.onNext(1);
                    Log.d(TAG, "emit 2");
                    emitter.onNext(2);
                    Log.d(TAG, "emit 3");
                    emitter.onNext(3);
                    Log.d(TAG, "emit complete");
                    emitter.onComplete();
                    Log.d(TAG, "emit 4");
                    emitter.onNext(4);
                }
            })
         Observer<Integer> observer =new Observer<Integer>() {
                private Disposable mDisposable;
                private int i;
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "subscribe");
                    mDisposable = d;
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "onNext: " + value);
                    i++;
                    if (i == 2) {
                        Log.d(TAG, "dispose");
                        mDisposable.dispose();
                        Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "error");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            });
    observable.subscrible(observer);
    打印如下
    subscribe
    emit 1
    onNext: 1
    emit 2
    onNext: 2
    dispose
    isDisposed : true
    emit 3
    emit complete
    emit 4
    

    另外subscribe()有多个重载的方法:

      public final Disposable subscribe() {}
      public final Disposable subscribe(Consumer<? super T> onNext) {}
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
      public final void subscribe(Observer<? super T> observer) {}
    

    不带参数的subcribe()表示Observer中不处理任何发送过来的事件,带consumer参数的对应处理响应的onNext,onError,onComplete。

    最后演示一下RxJava的链式调用方式,与上面的作用一样不过会看着很清爽,特别是现在androidStudio都会进行代码缩进。

            // RxJava的流式操作
            Observable.create(new ObservableOnSubscribe<Integer>() {
            // 1. 创建被观察者 & 生产事件
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                // 2. 通过通过订阅(subscribe)连接观察者和被观察者
                // 3. 创建观察者 & 定义响应事件的行为
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
                // 默认最先调用复写的 onSubscribe()
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "对Next事件"+ value +"作出响应"  );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
    
            });
    
    

    相关文章

      网友评论

        本文标题:RxJava入门(1)

        本文链接:https://www.haomeiwen.com/subject/izuxzxtx.html