美文网首页
RxJava(三、Basic realization)

RxJava(三、Basic realization)

作者: 敲代码的本愿 | 来源:发表于2016-08-25 00:27 被阅读84次

    目录

    一、Is what 是什么
    二、Concept 概念
    三、Basic realization 基本实现
    四、Scheduler 线程控制(上)
    五、Scheduler 线程控制(下)
    六、变换

    因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。

    基本实现

    主要分为三点:
    1. 创建Observer(观察者)
    2. 创建Observable(被观察者)
    3. Subscribe(订阅)

    一、创建Observer(观察者)

    职责:决定了事件触发时将有怎样的行为

    /**
      * 1. observer接口实现方式
      */
    Observer<String> observer = new Observer<String>() { 
        @Override 
        public void onNext(String s) { 
            Log.d(tag, "Item: " + s); 
        } 
        @Override 
        public void onCompleted() { 
            Log.d(tag, "Completed!"); 
        } 
        @Override 
        public void onError(Throwable e) { 
            Log.d(tag, "Error!"); 
        }
    };
    
    /**
      * 2. subscriber抽象类实现方式
      */
    Subscriber<String> subscriber = new Subscriber<String>() { 
        @Override 
        public void onNext(String s) { 
            Log.d(tag, "Item: " + s); 
        } 
        @Override 
        public void onCompleted() { 
            Log.d(tag, "Completed!"); 
        } 
        @Override 
        public void onError(Throwable e) { 
            Log.d(tag, "Error!"); 
        }
    };
    

    注意:

    1. subscriber是实现了observer接口的抽象类, 扩展了部分方法。
    2. RxJavasubscribe过程中,Observer 也总是先被转换成一个 Subscriber 再使用。所以若使用基本功能,选择 ObserverSubscriber是一样的。

    Observer 和 Subscriber 的区别

    1. **onStart(): **Subscriber增加的方法。
      subscribe 刚开始,事件尚未发送之前被调用,可用于做一些准备工作,如数据的清零或重置。可选方法,默认实现为空。

    注意:onStart() 是在 subscribe所发生的线程被调用,不能指定线程(在指定线程做准备工作,用 doOnSubscribe()),故不应做对线程有要求的操作(如 涉及主线程操作)。

    1. **unsubscribe(): **是 Subscriber实现的Subscription接口的方法,用于取消订阅。调用该方法后,Subscriber 将不再接收事件。可用 isUnsubscribed() 判断状态。

    注意:在 subscribe() 之后, Observable 会持有 Subscriber 的引用,该引用如果不能及时释放,会有内存泄露的风险。所以,不再使用时及时取消订阅(可在 onPause()onStop()中)。

    二、创建Observable(被观察者)

    职责:决定什么时候触发事件及触发怎样的事件。
    涉及方法:

    • create()
    • just(T...)
    • from(T[]) / from(Iterable<? extends T>)
    /**
      *  使用 create() 方法创建Observable,并定义事件触发规则
      */
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello"); 
            subscriber.onNext("Hi"); 
            subscriber.onNext("Aloha"); 
            subscriber.onCompleted(); 
        }
    });
    

    其中,传入一个 OnSubscribe对象作为参数。OnSubscribe会被存储在返回的 Observable对象中,作用相当于一个计划表,当 Observable被订阅的时候,OnSubscribecall() 方法会自动被调用,事件序列就会依照设定依次触发(即:观察者Subscriber 调用三次 onNext()、一次 onCompleted())。

    由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

    /**
      *  just(T...): 将传入的参数依次发送出来。
      */
    Observable observable = Observable.just("Hello", "Hi", "Aloha");
        // 将会依次调用:
        // onNext("Hello");
        // onNext("Hi");
        // onNext("Aloha");
        // onCompleted();
    
    /**
      *  from(T[]) / from(Iterable<? extends T>): 
      *  将传入的数组或 Iterable拆分成具体对象后,依次发送出来。
      */
    String[] words = {"Hello", "Hi", "Aloha"};
    Observable observable = Observable.from(words);
        // 将会依次调用:
        // onNext("Hello");
        // onNext("Hi");
        // onNext("Aloha");
        // onCompleted();
    

    上面 just(T...)from(T[]) 的例子,都和 create(OnSubscribe) 的例子是等价的。

    三、Subscribe(订阅)

    创建了 ObservableObserver 后,再用 subscribe() 方法将它们联结起来,就可以工作了。如下:

    observable.subscribe(observer);
    // 或
    observable.subscribe(subscriber);
    

    Observable.subscribe(Subscriber) 的内部实现(仅核心代码):

    // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 看源码到 RxJava 的 GitHub 仓库下载。
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart(); 
        onSubscribe.call(subscriber); 
        return subscriber;}
    

    其中,subscriber() 做了3件事:

    1. 调用 Subscriber.onStart()。该方法在前面介绍过,是可选的准备方法。
    2. 调用 Observable 中的 OnSubscribe.call(Subscriber)。这里,事件发送的逻辑开始运行。

    注意:Observable 并不是在创建的时就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。

    1. 将传入的 Subscriber作为 Subscription返回。为了方便 unsubscribe()
    流程图

    除了 subscribe(Observer)subscribe(Subscriber)subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber。如下:

    Action1<String> onNextAction = new Action1<String>() { 
        // onNext() 
        @Override 
        public void call(String s) { 
            Log.d(tag, s); 
        }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() { 
        // onError() 
        @Override 
        public void call(Throwable throwable) { 
            // Error handling 
        }
    };
    Action0 onCompletedAction = new Action0() { 
        // onCompleted() 
        @Override 
        public void call() { 
            Log.d(tag, "completed"); 
        }
    };
    // 自动创建 Subscriber,使用 onNextAction 来定义 onNext()
    observable.subscribe(onNextAction);
    // 自动创建 Subscriber,使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
    observable.subscribe(onNextAction, onErrorAction);
    // 自动创建 Subscriber,使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
    observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
    
    解释 Action1 和 Action0。

    Action0:
    RxJava 的一个接口,只有一个方法 call(),该方法无参无返回值;由于 onCompleted()方法也是无参无返回值,因此 Action0 可当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted()方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。
    Action1:
    RxJava 的一个接口,只有一个方法 call(T param) ,该方法有一个参数,无返回值;与 Action0 同理,由于 onNext(T obj)onError(Throwable error)也是单参数无返回值,因此 Action1可以将 onNext(obj)onError(error)打包起来传入 subscribe()以实现不完整定义的回调。

    注意:虽然 Action0Action1API 中使用最广泛,但 RxJava 提供了多个 ActionX 形式的接口 (如 Action2、Action3) ,它们可以被用以包装不同的无返回值的方法。

    举两个栗子

    a. 打印字符串数组

    /**
      *  将字符串数组 names中的所有字符串依次打印出来
      */
    String[] names = ...;
    Observable.from(names).subscribe(new Action1<String>() { 
        @Override 
        public void call(String name) { 
            Log.d(tag, name); 
         } 
    });
    

    b. 由 id 取得图片并显示

    /**
      *  由指定的一个 drawable 文件 id drawableRes 取得图片,
      *  并显示在 ImageView 中,在出现异常的时候打印 Toast 报错
      */
    int drawableRes = ...;
    ImageView imageView = ...;
    Observable.create(new OnSubscribe<Drawable>() { 
        @Override 
        public void call(Subscriber<? super Drawable> subscriber) { 
            Drawable drawable = getTheme().getDrawable(drawableRes));
            subscriber.onNext(drawable); 
            subscriber.onCompleted(); 
        }
    }).subscribe(new Observer<Drawable>() { 
        @Override 
        public void onNext(Drawable drawable) {
            imageView.setImageDrawable(drawable); 
        } 
        @Override 
        public void onCompleted() { 
        } 
        @Override 
        public void onError(Throwable e) { 
            Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();   
        }
    });
    

    注意:上面的两个例子均是同步的观察者模式。

    参考
    给 Android 开发者的 RxJava 详解`

    相关文章

      网友评论

          本文标题:RxJava(三、Basic realization)

          本文链接:https://www.haomeiwen.com/subject/gpposttx.html