一、配置
要在Android中使用RxJava2, 在app的build.gradle中添加依赖:
implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
二、框架结构
Rxjava整体是一条链其中:
1.链的上游是生产者:Observable
2.链的下游是观察者:Observer
3.链的中间:各个中介节点,既是下游的Observable,又是上游的Observer
三、基本使用
第一步:创建被观察者 Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();// 发送完onComplete后,下游将不会再接收消息
}
});
第二步:创建观察者Observer
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// 开始订阅调用的方法
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
Toast.makeText(TestAcitivity.this, "Error!", Toast.LENGTH_SHORT).show();
}
@Override
public void onComplete() {
}
};
第三步:建立订阅关系,被观察者需要去订阅观察者
observable
.subscribeOn(Schedulers.io()) // 在线程中执行上游操作
.observeOn(AndroidSchedulers.mainThread())// 在主线程中执行下游操作
.subscribe(observer);
<meta charset="utf-8">
3.2 线程的类型
subscribeOn/observeOn
都要求传入一个Schedulers
的子类,它就代表了运行线程类型,下面我们来看一下都有哪些选择:
-
Schedulers.computation()
:用于计算任务,默认线程数等于处理器的数量。 -
Schedulers.from(Executor executor)
:使用Executor
作为调度器,关于Executor
框架可以参考这篇文章:多线程知识梳理(5) - 线程池四部曲之 Executor 框架。 -
Schedulers.io( )
:用于IO
密集型任务,例如访问网络、数据库操作等,也是我们最常使用的。 -
Schedulers.newThread( )
:为每一个任务创建一个新的线程。 -
Schedulers.trampoline( )
:当其它排队的任务完成后,在当前线程排队开始执行。 -
Schedulers.single()
:所有任务共用一个后台线程。
以上是在io.reactivex.schedulers
包中,提供的Schedulers
,而如果我们导入了下面的依赖,那么在io.reactivex.android.schedulers
下,还有额外的两个Schedulers
可选:
四、关键词和方法解释
一.Emitter
就是发射器的,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
这里需要注意的是:
1、上游可以发送无限个onNext, 下游也可以接收无限个onNext。当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。
2、当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。上游可以不发送onComplete或onError。
3、发送多个onComplete是可以正常运行的, 但是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃。
二.subscribeOn()
1.在Scheduler制定的线程启动subscribe()
2.切换起源的Observable的线程
3.当多次调用subscribeOn() 的时候,只有最上面的会对起源Observable起作用
三:ObserveOn
1.在内部创建Observer的onNext() onError()
onSuccess()等回调方法里,通过Scheduler指定的线程来调用下级Observer的对应回调方法
2.切换observeOn()下面的Oberver的回调所在的线程
3.当多次调用observerOn()的时候,每个都会进行一次线程的切换,影响范围是它下面的每个Oberver(除非又遇到新的observeOn())
四.Scheduler的原理
1.Schedulers.newThread() Schedulers.io();
· 当scheduleDirect()被调用时候,会创建一个 Worker,Worker的内部会有一个Excutor,由Executor来完成实际的线程切换;
· scheduleDirect() 还会创建出一个Disposable对象,交给外层的Observer,让它执行dispose()操作,取消订阅链;
· newThread()和io()的区别在于,io()可能会对Excutor进行重用.
2.AndroidSechedulers.mainThread();
通过内部的Handler把任务post到主线程去做
网友评论