前言
什么是函数式编程?RxJava是什么?什么是观察者模式?巴拉巴拉要了解的东西太多了。不过不用过于纠结,这些可以当你掌握了RxJava的基本用法再去深究,现在不用过于追求理论上的,太过于形而上学的东西,先实践出真知。
使用RxJava2.0之前是不是需要学习RxJava1.0?
我认为:如果你想「找不同」的话可以反过去学习RxJava1.0,比较他们的区别。否则的话,直接使用RxJava2.0就好了。如果你学RxJava1.0这一部分就不用看啦。
使用之前需要在build.gradle添加依赖:
//具体版本以github上的为依据
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.2'
正文
看了很多关于RxJava的教程,一开始都会长篇大论的跟你讲解观察者模式,什么是Observable,什么是Observer...一堆理论,绕老绕去,总能成功的把我绕晕。
下面是我个人的理解是这样的:
上游下游我的理解:被观察者 和 观察者都是两条流动的水管的关系,如果尝试着用GUI的方式来表达,应该是这样子的:
对应的『上游』指的是『事件的产生』,『下游』指的是『事件的响应』。『上游』和『下游』通过一个『渠道』来连接。连接后的『上游』产生的事件 1,2,3,4对应产生『下游』事件1,2,3,4的反应。
在RxJava中『上游』『下游』所对应的是『Observable』和『Observer』,它们之间的『链接管道』就是『subscribe()』方法,因此在RxJava中用代码来表示他们的上下游关系就是:
第一步:创建一个『上游』---->Observable
//创建一个『上游』
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.i(TAG, "emitter"+1);
e.onNext(1);
Log.i(TAG, "emitter"+2);
e.onNext(2);
Log.i(TAG, "emitter"+3);
e.onNext(3);
Log.i(TAG, "emitter"+4);
e.onNext(4);
Log.i(TAG, "emitter"+"complete");
e.onComplete();
}
});
第二步:穿件一个『下游』---->Observer
//创建一个『下游』
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe..."+d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"onNext..." + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"onError..."+e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG,"onComplete");
}
};
第三步:通过『链接渠道』进行连接起来
//通过『连接管道』连接起来
observable.subscribe(observer);
最后:看一下结果:
08-22 15:13:53.552 17237 17237 I RxTag : onSubscribe...false
08-22 15:13:53.552 17237 17237 I RxTag : onNext...1
08-22 15:13:53.552 17237 17237 I RxTag : onNext...2
08-22 15:13:53.552 17237 17237 I RxTag : onNext...3
08-22 15:13:53.552 17237 17237 I RxTag : onNext...4
08-22 15:13:53.552 17237 17237 I RxTag : onComplete
然后也可以用超级炫酷无敌的链式编程写下来就是这样:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.i(TAG, "emitter"+1);
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe..." + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext..." + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError..." + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
接下来,我们就可以慢慢了解这几个东东ObservableEmitter
Disposable
以及onSubscribe
onNext
onComplete
onError
的关系
ObservableEmitter: Emitter在翻译过来是『发射』的意思.
我们稍微扒一扒源码就会发现 这是一个Interface
继承自Emitter
,Emitter
这个接口有三个抽象方法:
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
也就是说Emitter
可以『发射』onNext
onError
onComplete
三个事件给『下游』观察者。
但是,你发射事件也有一些规则需要遵循:
- 『上游』通过
Emitter
可以无限制的『发射』onNext
事件,『下游』可以无限的接受并且响应onNext
事件. - 『上游』的
onComplete
onError
方法的『发射』是唯一并且互斥的.- 如果在『上游』调用
emitter.onComplete
之后仍然『发射』了其他事件,那么『下游』在onComplete
事件之后就不会有任何响应,但是『上游』在emitter.onComplete
后的代码依然会执行。 -
emitter.onError
和emitter.onComplete
一样。
如果在『上游』调用emitter.onComplete
之后仍然『发射』了其他事件,那么『下游』在onComplete
事件之后就不会有任何响应,但是『上游』在emitter.onComplete
后的代码依然会执行。 - 『上游』可以选择不发送
onError
onComplete
事件. - 最为关键的事
onError
和onComplete
只能发送一个事件,也就是说只能发送一个onError
或者onComplete
,不能先发一个onError
再发一个onComplete
,也不能先发一个onComplete
再发一个onError
- 如果在
emitter.onError
或者emitter.onComplete
之后再发射emitter.onError
这会导致app Crash哦。但是多次发射emitter.onComplete
不会导致app crash
- 如果在『上游』调用
代码中验证这个规则:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i =0;i<8;i++){
Log.i(TAG, "emitter"+i);
if(i == 3){
Log.i(TAG, "emitter "+"complete");
e.onComplete();
}
e.onNext(i);
}
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe..." + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext..." + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError..." + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
可以看到打印结果是这样的:
08-22 15:59:58.320 19476 19476 I RxTag : onSubscribe...false
08-22 15:59:58.320 19476 19476 I RxTag : emitter0
08-22 15:59:58.321 19476 19476 I RxTag : onNext...0
08-22 15:59:58.321 19476 19476 I RxTag : emitter1
08-22 15:59:58.321 19476 19476 I RxTag : onNext...1
08-22 15:59:58.321 19476 19476 I RxTag : emitter2
08-22 15:59:58.321 19476 19476 I RxTag : onNext...2
08-22 15:59:58.321 19476 19476 I RxTag : emitter3
08-22 15:59:58.322 19476 19476 I RxTag : emitter complete
08-22 15:59:58.322 19476 19476 I RxTag : onComplete
08-22 15:59:58.322 19476 19476 I RxTag : emitter4
08-22 15:59:58.322 19476 19476 I RxTag : emitter5
08-22 15:59:58.322 19476 19476 I RxTag : emitter6
08-22 15:59:58.322 19476 19476 I RxTag : emitter7
如果用图形表示的话可以这样:
onNext:
onCompleted:
onError:
显而易见:在『下游』接收到onComplete事件后就不再接受任何事件,但是『上游』依然执行emitter.onComplete之后的代码。`onError`也是这样,这里就不进行验证了
说完ObservableEmitter
和onNext
onComplete
onError
之间的关系之后再来说说Disposable
这个玩意儿。
Disposable
:翻译过来不是『一次性的』,在这里我们可以理解为这个是『链接管道的开关』。
按照惯例扒一扒源码发现Disposable
也是一个interface
:
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
disposable.dispose
:就代表你你关上了『链接管道』的开关。当你关上开关了就代表『下游』不在响应『上游』的任何事件。
disposable.isDisposed
:返回你这个『链接管道』目前是开还是关的状态。返回True --> 代表关上了,返回Flase --> 代表还是开着呢。
毛主席说过实践出真知,那么在代码里面验证一下呗:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i =0;i<8;i++){
Log.i(TAG, "emitter"+i);
e.onNext(i);
}
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
Log.i(TAG, "onSubscribe..." + d.isDisposed());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext..." + integer);
if(integer == 3){
Log.i(TAG, "Disposable..." + mDisposable.isDisposed());
mDisposable.dispose();
Log.i(TAG, "Disposable..." + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError..." + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
再来看一看输出结果:
08-22 16:26:22.832 21387 21387 I RxTag : onSubscribe...false
08-22 16:26:22.832 21387 21387 I RxTag : emitter0
08-22 16:26:22.832 21387 21387 I RxTag : onNext...0
08-22 16:26:22.832 21387 21387 I RxTag : emitter1
08-22 16:26:22.832 21387 21387 I RxTag : onNext...1
08-22 16:26:22.832 21387 21387 I RxTag : emitter2
08-22 16:26:22.832 21387 21387 I RxTag : onNext...2
08-22 16:26:22.833 21387 21387 I RxTag : emitter3
08-22 16:26:22.833 21387 21387 I RxTag : onNext...3
08-22 16:26:22.833 21387 21387 I RxTag : Disposable...false
08-22 16:26:22.833 21387 21387 I RxTag : Disposable...true
08-22 16:26:22.833 21387 21387 I RxTag : emitter4
08-22 16:26:22.833 21387 21387 I RxTag : emitter5
08-22 16:26:22.833 21387 21387 I RxTag : emitter6
08-22 16:26:22.833 21387 21387 I RxTag : emitter7
再次证明了Disposable控制着这个『链接管道』开关,一旦你关上开关了就代表『下游』不在响应『上游』的任何事件,但是『上游』依然会继续执行。
讲到这,可能你并没有发现RxJava的好处,却觉得除了链式会酷一点完全没啥优势啊。那么我们可以慢慢来先看看Observable.subscribe()这方法的几个重载方法吧。
public final Disposable subscribe(){}
public final Disposable subscribe(Consumer<? super T> onNext){}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete){}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe){}
public final void subscribe(Observer<? super T> observer)
你可以只接受onNext() 或者选择性接受onNext()和onError()....
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i =0;i<8;i++){
Log.i(TAG, "emitter"+i);
e.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "onNext..." + integer);
}
});
是不是瞬间简单起来了。
要想更加简单那么我们来一个kotlin的版本,简洁的到怀疑人生:
Observable.create(ObservableOnSubscribe<Int> { e ->
for (i in 0..7) {
Log.i(TAG, "emitter" + i)
e.onNext(i)
}
}).subscribe { integer -> Log.i(TAG, "onNext..." + integer) }
总结
这一部分主要讲解RxJava中关于『上游』『下游』『链接管道』『开关』的一些理解,只是浅显的最基本的进行一个被观察者和观察者的订阅。下一部分将深一步的介绍RxJava在线程调度的作用。关于管道的理解是参考了http://www.jianshu.com/p/464fa025229e 他的讲解,觉得特别好就借鉴来学习。
网友评论