概述
Rxjava有海量的操作符,很多操作符还有多个重载方法。所以这篇笔记就基于RxJava2尽可能地分类汇总一下,方便以后需要使用的时候查看一下。另外,写了一个RxJava与Retrofit搭配使用的例子,也一并码住。操作符按功能分主要六大类:创建操作符、变换操作符、组合/合并操作符、功能性操作符、过滤操作符、条件/布尔操作符。
一、分类汇总
/********************************************************/
*创建操作符:
create():完整创建1个被观察者对象(Observable)
just():快速创建1个被观察者对象(Observable)、直接发送,传入的事件。
fromArray():快速创建1个被观察者对象(Observable),直接发送,传入****的数组数据。
fromIterable():快速创建1个被观察者对象(Observable),直接发送,传入的集合List数据。
defer():直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件。
timer():快速创建1个被观察者对象(Observable),延迟指定时间后,发送1个数值0(Long类型)。
interval():快速创建1个被观察者对象(Observable),每隔指定时间 就发送 事件。
intervalRange():快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量。
range():快速创建1个被观察者对象(Observable),连续发送 1个事件序列,可指定范围。
rangeLong():类似于range(),区别在于该方法支持数据类型 = Long
/********************************************************/
*变换操作符:
Map():对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件。
FlatMap():将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。
ConcatMap():类似FlatMap()操作符,拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序。
Buffer():定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送。
/********************************************************/
*组合/合并操作符:
concat() / concatArray():组合多个被观察者一起发送数据,合并后 按发送顺序串行执行。
merge() / mergeArray():组合多个被观察者一起发送数据,合并后 按时间线并行执行。
concatDelayError() / mergeDelayError():冲突解决。
Zip():合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。
combineLatest():当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
combineLatestDelayError():作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述。
reduce():把被观察者需要发送的事件聚合成1个事件 & 发送
collect():将被观察者Observable发送的数据事件收集到一个数据结构里。
startWith() / startWithArray():在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者。
count():统计被观察者发送事件的数量。
/********************************************************/
*功能性操作符
subscribe():订阅,即连接观察者 & 被观察者.
subscribeOn(): 线程调度。
observeOn():线程调度。
delay():使得被观察者延迟一段时间再发送事件。
do():在某个事件的生命周期中调用。
onErrorReturn():遇到错误时,发送1个特殊事件 & 正常终止。
onErrorResumeNext():遇到错误时,发送1个新的Observable。
onExceptionResumeNext():遇到错误时,发送1个新的Observable
retry():重试,即当出现错误时,让被观察者(Observable)重新发射数据。
retryUntil():出现错误后,判断是否需要重新发送数据。
retryWhen():遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件。
repeat():无条件地、重复发送 被观察者事件。
repeatWhen():有条件地、重复发送 被观察者事件。
/********************************************************/
*过滤操作符
Filter():过滤 特定条件的事件。
ofType():过滤 特定数据类型的数据。
skip() / skipLast():跳过某个事件。
distinct() / distinctUntilChanged():过滤事件序列中重复的事件 / 连续重复的事件。
take():指定观察者最多能接收到的事件数量。
takeLast():指定观察者只能接收到被观察者发送的最后几个事件。
throttleFirst()/ throttleLast():在某段时间内,只发送该段时间内第1次事件 / 最后1次事件。
Sample():在某段时间内,只发送该段时间内最新(最后)1次事件。
throttleWithTimeout () / debounce():发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据。
firstElement() / lastElement():仅选取第1个元素 / 最后一个元素。
elementAt():指定接收某个元素(通过 索引值 确定)。
elementAtOrError():在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常。
/********************************************************/
*过滤操作符
all():判断发送的每项数据是否都满足 设置的函数条件。
takeWhile():判断发送的每项数据是否满足 设置函数条件。
skipWhile():判断发送的每项数据是否满足 设置函数条件。
takeUntil():执行到某个条件时,停止发送事件。
takeUntil():执行到某个条件时,停止发送事件。
skipUntil():等到 skipUntil() 传入的Observable开始发送数据,(原始)第1个Observable的数据才开始发送数据。
SequenceEqual():判定两个Observables需要发送的数据是否相同。
contains():判断发送的数据中是否包含指定数据。
isEmpty():判断发送的数据是否为空。
amb():当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃。
defaultIfEmpty():在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值.
二、RxJava + Retrofit
RxJava 和 Retrofit何以结合使用,Retrofit提供了支持 RxJava 的转换接口。使用就不详解了,直接贴代码,简单粗暴:
/*******
* Retrofit基于OkHttp,通过注解提供网络请求、传参等功能,方便灵活。
* 同时提供网络响应数据转换。
*
* *********/
/**
* Created by Ethan Lee on 19/7/20.
*/
public class RetrofitInstance {
private volatile static Retrofit retrofit;
private static final int READ_TIME_OUT = 20;
private static final int CONNECT_TIME_OUT = 20;
private static final int WRITE_TIME_OUT = 20;
private static final String CACHE_NAME = "cache";
private static boolean DEBUG_MODE = true;
public static Retrofit getRetrofitInstance() {
final String baseUrl = "" + "/";
if (retrofit == null) {
synchronized (NetRequestTool.class) {
if (retrofit == null) {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
if (DEBUG_MODE) {
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
} else {
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
}
File cacheFile = new File(BaseApplication.getAppContext().getExternalCacheDir(), CACHE_NAME);
Cache cache = new Cache(cacheFile,1024 * 1024 * 20);
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.cache(cache)
.addInterceptor(loggingInterceptor)
.addInterceptor(cacheControlInterceptor)
.connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)
.readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)
.writeTimeout(WRITE_TIME_OUT, TimeUnit.SECONDS)
.retryOnConnectionFailure(true);
retrofit = new Retrofit.Builder()
.addConverterFactory(ScalarsConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(baseUrl)
.client(builder.build())
.build();
}
}
}
return retrofit;
}
private static final Interceptor cacheControlInterceptor = new Interceptor() { //缓存设置
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
if (!InternetStatus.isNetAvailable(BaseApplication.getAppContext())) {
request = request.newBuilder().cacheControl(CacheControl.FORCE_CACHE).build();
}
Response originalResponse = chain.proceed(request);
if (InternetStatus.isNetAvailable(BaseApplication.getAppContext())) {
String cacheControl = request.cacheControl().toString();
return originalResponse.newBuilder()
.header("Cache-Control", cacheControl)
.removeHeader("Pragma")
.build();
} else {
int maxStale = 60 * 60 * 24 * 7;
return originalResponse.newBuilder()
.header("Cache-Control", "public, only-if-cached, max-stale=" + maxStale)
.removeHeader("Pragma")
.build();
}
}
};
}
/**
* Created by Ethan Lee on 19/7/20.
*/
public interface RetrofitRequestInterface {
@GET
Call<ResponseBody> getRequest(@Url String url);
@GET
Call<ResponseBody> getRequestWithToken(@Header("token") String token, @Url String url);
@POST
@Headers("content-type: application/json")
Call<ResponseBody> postRequest(@Url String url, @Body String param);
@POST
@Headers("content-type: application/json")
Call<ResponseBody> postRequestWithToken(@Header("token") String token, @Url String url, @Body String param);
@Multipart
@POST
Call<ResponseBody> uploadMemberIcon(@Header("token") String token, @Url String url, @PartMap Map<String, RequestBody> params, @Part MultipartBody.Part file);
@POST
@Multipart
Call<ResponseBody> uploadECGraph(@Header("token") String token, @Url String url, @Part MultipartBody.Part file);
/************ RXJava与 retrofit结合***********/
@GET
Observable<JSONObject> getRequestWithTokenByRXJava(@Header("token") String token , @Url String url);
@POST
@Headers("content-type: application/json")
Observable<JSONObject> postRequestWithResultByRXJava(@Header("token") String token , @Url String url , @Body String params);
}
Demo在 : RxJava + Retrofit
网友评论