这可能是最好的RxJava 2.x 教程(完结版)
Android RxJava 2 的用法 just 、from、map、subscribe、flatmap、Flowable、Function、Consumer ...
RxJava2 使用解析——常见的使用场景
大佬们,一波RxJava 3.0来袭,请做好准备~
RxJava2.x
依赖的几个接口:
-
Publisher
:被观察者,发出一系列的事件 -
Subscriber
:观察者, 负责处理这些事件. Subscription
-
Processor
image.png
package org.reactivestreams;
public interface Publisher<T> {
public interface Subscriber<T> {
public interface Subscription {
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
-
Observable
(被观察者)/Observer
(观察者) -
Flowable
(被观察者)/Subscriber
(观察者)
Observeable
用于订阅Observer
,是不支持背压的.Flowable
用于订阅Subscriber
,是支持背压(Backpressure
)的。
public abstract class Observable<T> implements ObservableSource<T> {}
public abstract class Flowable<T> implements Publisher<T> {...}
public interface Observer<T> {
void onSubscribe(@NonNull Disposable var1);
void onNext(@NonNull T var1);
void onError(@NonNull Throwable var1);
void onComplete();
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
借用的图片:https://www.jianshu.com/p/0cd258eecf60
实用操作符
1、ObserveOn
指定观察者的线程,例如在Android访问网络后,数据需要主线程消费,那么将观察者的线程切换到主线就需要ObserveOn操作符。每次指定一次都会生效。
2、subscribeOn
指定被观察者的线程,即数据源发生的线程。例如在Android访问网络时,需要将线程切换到子线程。多次指定只有第一次有效。
3、doOnEach
数据源(Observable)每发送一次数据,就调用一次。
4、doOnNext
数据源每次调用onNext() 之前都会先回调该方法。
5、doOnError
数据源每次调用onError() 之前会回调该方法。
6、doOnComplete
数据源每次调用onComplete() 之前会回调该方法
7、doOnSubscribe
数据源每次调用onSubscribe() 之后会回调该方法
8、doOnDispose
数据源每次调用dispose() 之后会回调该方法
背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException
消耗内存过大只会OOM
。
创建 Obeservable
的时候,传入ObservableOnSubscribe
对象作为参数。当 Observable
被订阅的时候,ObservableOnSubscribe
的subscribe()
方法会自动
被调用,事件序列就会依照设定依次触发,比如观察者 Observer
将会被调用一次onNext()
.
采用 OkHttp3
配合 map
, doOnNext
, 线程切换做简单的网络请求
- 1、通过
Observable.create()
方法,调用OkHttp
网络请求; - 2、通过
map
操作符结合Gson/JSONObject
, 将Response
转换为bean/entity
类; - 3、通过
doOnNext()
方法,解析bean
中的数据,并进行数据库存储等操作; - 4、调度线程,在子线程进行耗时操作任务,在主线程更新
UI
; - 5、通过
subscribe()
根据请求成功或者失败来更新UI
。
Concat
使用场景:
Concat
先读取缓存数据并展示UI再获取网络数据刷新UI。
- 1、
concat
可以做到不交错的发射两个甚至多个Observable
的发射物; - 2、并且只有前一个
Observable
终止(onComplete
)才会订阅下一个Observable
,也就是在操作符concat
中,只有调用onComplete
之后才会执行下一个Observable
,如果缓存数据不为空,则直接读取缓存数据,而不读取网络数据。 - 3、两个
Observable
的泛型应当保持一致
concatMap
concatMap
作用和flatMap
几乎一模一样,唯一的区别是它能保证事件的顺序。
interval( rx2在用)、timer(过时)方式
间隔执行操作,默认在新线程,当需要更新UI则记得切换到UI线程。在Rxjava
中timer
操作符既可以延迟执行一段逻辑,也可以间隔执行一段逻辑
- [注意] 但在
RxJava 2.x
已经过时了,现在用interval
操作符来间隔执行. -
timer
和interval
都默认执行在一个新线程上。
Observable observable =Observable.interval(1, TimeUnit.SECONDS);
Disposable mDisposable = Flowable.interval(1, TimeUnit.SECONDS).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept: doOnNext:" + aLong);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
mTextView.setText("心跳间隔任务" + aLong);
}
});
创建一个按固定时间间隔发射整数序列的Observable
,可用作定时器。即按照固定2秒一次调用onNext()
方法。即时通讯等需要轮训的任务在如今的 APP
中已是很常见,而RxJava 2.x
的 interval
操作符即可实现。
map方法
map
操作符可以将一个 Observable
对象通过某种关系转换为另一个Observable
对象,传入的参数为Function
类型。作用是对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化。
- 1.
map
回调中,能直接拿到传进来得参数,并且返回一个值。处理完传进来的参数后发射给下一级。 - 2.传进来得参数可以随意操作转换,参数与返回值的类型由
Function
的泛型控制。 - 3.
Function
泛型第一个是传进来的参数,第二个是返回值的类型
flatMap方法
作用等同于map
方法。将传递进来的参数进行转换后返回。
区别在于返回值类型为Publisher
或其实现类。Flowable
,flowable
是publisher
的实现类,少写一个for
循环,用了forArray()
方法。 FlatMap
将一个发送事件的上游Observable
变换成多个发送事件的Observables
,然后将它们发射的事件合并
后放进一个单独的Observable
里。需要注意flatMap
并不保证事件的顺序。
使用场景:
多个网络请求依次依赖,比如:
-
1、注册用户前先通过接口A获取当前用户是否已注册,再通过接口B注册;
-
2、注册后自动登录,先通过注册接口注册用户信息,注册成功后马上调用登录接口进行自动登录。
-
zip
分别从两个上游事件中各取出一个组合,一个事件只能被使用一次,顺序严格按照事件发送的顺序最终下游事件收到的是和上游事件最少的数目相同(必须两两配对,多余的舍弃)
使用场景:结合多个接口的数据再更新 UI。
需求:九宫格菜单+角标,两个接口返回,但是要一一对应上
实现方案:GridView+zip
。先返回菜单的list集合,再将这个集合跟角标集合结合,如果id相等,则给菜单集合的角标属性赋值并返回终极的数据源list。
Observable observable1 = Observable.create(new ObservableOnSubscribe<List<MenuEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<MenuEntity>> observableEmitter) throws Exception {
observableEmitter.onNext(parseMenuListFromLocal(FileTools.getFileStremFromRaw(ZipActivity.this, R.raw.menu_grid_json)));
}
});
Observable observable2 = Observable.create(new ObservableOnSubscribe<List<MenuEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<MenuEntity>> observableEmitter) throws Exception {
observableEmitter.onNext(parseMenuNumListFromLocal(FileTools.getFileStremFromRaw(ZipActivity.this, R.raw.menu_grid_num_json)));
}
});
Observable.zip(observable1, observable2, new BiFunction<List<MenuEntity>, List<MenuEntity>, List<MenuEntity>>() {
@Override
public List<MenuEntity> apply(List<MenuEntity> menuEntities, List<MenuEntity> menuEntities2) throws Exception {
for (int i = 0; i < menuEntities.size(); i++) {
MenuEntity menuEntity = menuEntities.get(i);
for (int j = 0; j < menuEntities2.size(); j++) {
if (menuEntity.getId().equals(menuEntities2.get(j).getId())) {
menuEntity.setNum(menuEntities2.get(j).getNum());
}
}
}
return menuEntities;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<MenuEntity>>() {
@Override
public void accept(List<MenuEntity> menuEntities) throws Exception {
menuGridList.clear();
menuGridList.addAll(menuEntities);
menuGridviewAdapter.notifyDataSetChanged();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
Log.i(TAG, TAG+"-subscribe: start emit data");
observableEmitter.onNext("cc");
observableEmitter.onNext("qq");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.i(TAG, TAG+"-onSubscribe: start subscribe ");
}
@Override
public void onNext(String s) {
Log.i(TAG, TAG+"-onNext: " + s);
}
@Override
public void onError(Throwable throwable) {
Log.i(TAG, TAG+"-onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, TAG+"-onComplete: ");
}
});
输出结果:
RxJavaDemoActivity-onSubscribe: start subscribe //先执行订阅
RxJavaDemoActivity-subscribe: start emit data
RxJavaDemoActivity-onNext: cc
RxJavaDemoActivity-onNext: qq
RxJavaDemoActivity-onComplete:
debounce
debounce
操作符可以过滤掉发射频率过快的数据项,减少频繁的网络请求。
使用情景:
搜索时过滤掉输入过快不停的做网络请求,结合switchMap
使用,避免后面的请求比前面的请求快先返回数据更新后,被之前的请求后返回来覆盖掉。
- 1、输入框数据变化时就要进行网络请求,这样会产生大量的网络请求。这时候可以通过
debounce
进行处理 - 2、点击一次按钮就进行一次网络请求.
RxView.clicks(mRxOperatorsBtn)
.debounce(1,TimeUnit.SECONDS) // 过滤掉发射频率小于1秒的发射事件
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
clickBtn();
}
});
just方法、fromArray方法
传入一个数组参数
Observable observable = Observable.just("hi");
传入若干个相同参数,和fromArray
作用一样,使用just( )
,将为你创建一个Observable
订阅后并自动为你调用onNext( )
发射数据。通过just( )
方式 直接触发onNext()
,just
中传递的参数将直接在Observer
的onNext()
方法中接收到。
fromIterable()方式
fromIterable()
,遍历集合,发送每个item。相当于多次回调onNext()
方法,每次传入一个item.
@CheckReturnValue
@SchedulerSupport("none")
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableFromIterable(source));
}
...
Observable.fromIterable(new ArrayList<String>()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG,s);
}
});
subscribe方法
传入一个Consumer
对象,也就是被观察者,一串异步操作,最后在accept
方法中显示UI.
public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return flatMap(mapper, false, bufferSize(), bufferSize());
}
subscribeOn方法
subscribeOn
改变调用它之前代码的线程,传入的是一个Schedule
对象,指一系列操作在哪个线程,用于指定Observeable被观察者在哪个线程中运行。一般为Schedule.io
.
observeOn方法
observeOn
改变调用它之后代码的线程,传入Schedule
对象,回调在哪个线程中执行,一般在主线程执行,Observe
观察者。指定的subscribe
在哪个线程中执行。
retryWhen
我们在app里发起网络请求时,可能会因为各种问题导致失败。如何利用RxJava来实现出现错误后重试若干次,并且可以设定重试的时间间隔。
当.retry()
接收到.onError()
事件后触发重订阅。
/**
*
* @param client okhttpclient
* @param method get post
* @param urlArg url路径
* @param params 参数
* @param headers http头信息
* @param maxRetryCount 网络异常重试次数
* @param retryDelayTime 重试的延迟时间
* @return
*/
public static Observable<String> execute(final OkHttpClient client, final String method, final String urlArg, final Map<String,String> params, final Map<String,String> headers, final int maxRetryCount, final int retryDelayTime){
Observable<String> httpObservable = Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
Log.d(TAG,"httpObservable》》》开始执行");
String url = urlArg;
Request.Builder builder = new Request.Builder();
//builder.cacheControl(new CacheControl.Builder().noCache().build());
if("GET".equalsIgnoreCase(method.toString())){
if(params!=null){
for (Map.Entry<String, String> entry : params.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
if(!TextUtils.isEmpty(name)){
name = name.trim();
if(url.contains("?")){
url=url+"&"+name+"="+(TextUtils.isEmpty(value)?"":value.trim());
}else{
url=url+"?"+name+"="+(TextUtils.isEmpty(value)?"":value.trim());
}
}
}
}
}else{
FormBody.Builder formBuidler = new FormBody.Builder();
if(params!=null){
for (Map.Entry<String, String> entry : params.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
if(!TextUtils.isEmpty(name)){
name = name.trim();
//formBuidler.addEncoded(name,(TextUtils.isEmpty(value)?"":value.trim()));
formBuidler.add(name,(TextUtils.isEmpty(value)?"":value.trim()));
}
}
}
FormBody formBody = formBuidler.build();
builder.post(formBody);
}
builder.url(url);
//添加自定义header
if(headers!=null){
for (Map.Entry<String, String> entry : headers.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
if(!TextUtils.isEmpty(name) && !TextUtils.isEmpty(value)){
name = name.trim();
value = value.trim();
builder.header(name,value);
}
}
}
Request request = builder.build();
Response response = client.newCall(request).execute();
emitter.onNext(response);
emitter.onComplete();
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int retryNum = 0;
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (++retryNum <= maxRetryCount && (throwable instanceof UnknownHostException || throwable instanceof IOException)) {
Log.d(TAG,"retry("+retryNum+")》》》"+throwable.getMessage());
return Observable.timer(retryDelayTime, TimeUnit.MILLISECONDS);
} else{
return Observable.error(throwable);
}
}
});
}
}).flatMap(new Function<Response, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Response response) throws Exception {
Log.d(TAG,"httpObservable》》》处理Response,读取body数据");
if(response.isSuccessful()){
return Observable.just(response.body().string());
}else{
Log.d(TAG,"请求失败,状态码:"+response.code());
return Observable.error(new RuntimeException("请求失败,状态码:"+response.code()));
}
}
});
return httpObservable;
}
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0206/3953.html
网友评论