注:本文仅为本人阅读原文章所做笔记,如有理解错误,欢迎指正
链接
一、API介绍和原理简析
-
观察者模式
A(观察者)在B(被观察者)发生了需要被监控到的某个变化时的一瞬间做出反应。
Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。
对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。
订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。
Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件。
-
RxJava的观察者模式
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
但是在RxJava代码中却是Observable.subscribe(Observer)
即被观察者订阅了观察者。
可以理解为被观察者通知观察者:你可以观察我了,我做出的变化将会让你知道。
二、基本实现
1.创建Observer(观察者)
-
Observer即观察者,它决定事件被触发的时候会有怎么样的行为。
-
RxJava内置了一个实现了Observer的抽象类:Subscriber(英文单词:订阅者,也即RxJava中的观察者)。
-
Subscriber使用方式与Observer相同,不过有以下两个区别:
(1) Subscriber的实现中多了一个可选的OnStart方法,此方法可以用作流程开始时的一些初始化操作;此方法默认在Subscribe(订阅)时调用,无法指定所执行的线程,因此不能用于改变UI,例如显示ProgressBar;如果需要,可以使用Observable.doOnSubscribe()
,默认情况下doOnSubscribe方法执行在subscribe方法发生的线程,但是如果在doOnSubscribe方法之后有subscribeOn方法的话,它将执行在里它最近的subscribeOn方法指定的线程内。代码如下:
private void toSubscribe(Observable o, Subscriber s) {
o.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s);
}
(2) Subscriber还实现了另外一个接口Subscription中的unSubscribe方法,用于取消订阅防止内存泄漏,一般在这个方法调用之前会先调用isSubscribed方法判断一下状态。
2.创建Observable(被观察者)
- Observable即被观察者,它决定触发事件以及触发怎样的事件。它可以通过
Observable.create()
、Observable.just()
、Observable.from()
来创建。
3.Subscribe(订阅)
- 在创建了观察者与被观察者之后,使用subscribe方法将它们联合起来:
observable.subscribe(observer)
或者observable.subscribe(subscriber)
- subscribe方法还支持不完整定义的回调,RxJava会自动根据定义创建出Subscriber。如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
实例
- 依次打印字符串:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
- 由指定的一个Drawable文件的id取的图片,显示在ImageView中,并出现异常的时候打印Toast报错:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
三、线程控制:Schedule(一)
Scheduler的API
-
Schedulers.immediate()
:直接运行在当前线程,默认。 -
Schedulers.newThread()
:启动新线程,并在新线程中执行操作。 -
Schedulers.io()
:I/O操作(读写文件、读写数据库、网络信息交互等)所使用的的Scheduler。 -
Schedulers.computation()
:计算所使用的Scheduler。 -
Android专用的
AndroidSchedulers.mainThread()
,它指定的操作在Android主线程运行(需要添加RxAndroid的依赖)。
SubscribeOn()
指定subscribe()
所发生的线程;ObserveOn()
指定Subscriber所发生的线程。
四、变换
1.API
RxJava 提供了对事件序列进行变换的支持,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。
-
map()
:
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
-
flatMap()
:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable.from(books)
.flatMap(new Func1<Book, Observable<String>>() {
@Override
public Observable<String> call(Book book) {
return getBookName(book);
}
}).subscribe(subscriber);
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
-
map()
与flatMap()
:
flatMap()
和map()
有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map()
不同的是,flatMap()
中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。flatMap()
的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
2.变换的原理
变换的功能虽然各有不同,但是实质上都是针对事件序列的处理和发送。在RxJava的内部,它们是基于同一个基础的变化方法:lift(Operator)
。
精简来讲:在Observable执行了lift(Operator)方法之后,会返回一个新的Observable,这个新的Observable会像一个代理一样,负责接收原始的Observable发出的事件,并在处理之后发送给Subscriber。
- 示例:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
// 接收处理之后的事件
}
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
}
});
// 返回一个Observable
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
return new Subscriber<Integer>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(Integer integer) {
// 处理事件并发送给Subscriber
subscriber.onNext(integer + "");
}
};
}
}).subscribe(subscriber);
也可以使用compse
对Observable整体进行转换:
observable.compose(new Observable.Transformer<Integer, String>() {
@Override
public Observable<String> call(Observable<Integer> integerObservable) {
return null;
}
});
五、线程控制:Scheduler(二)
- 多次切换线程:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
通过对observeOn()
的多次调用实现程序线程的多次切换;但是,不同于observeOn()
,subscribeOn()
的位置放在那里都可以,但是它只能被调用一次。
六、其它
与Retrofit的结合使用、RxBinding、RxBus请阅读原文
网友评论