美文网首页
RxJava操作符分类汇总+Rxjava与Retrofit搭配使

RxJava操作符分类汇总+Rxjava与Retrofit搭配使

作者: 碧云天EthanLee | 来源:发表于2021-07-14 15:43 被阅读0次
概述

Rxjava有海量的操作符,很多操作符还有多个重载方法。所以这篇笔记就基于RxJava2尽可能地分类汇总一下,方便以后需要使用的时候查看一下。另外,写了一个RxJava与Retrofit搭配使用的例子,也一并码住。操作符按功能分主要六大类:创建操作符、变换操作符、组合/合并操作符、功能性操作符、过滤操作符、条件/布尔操作符

一、分类汇总

/********************************************************/
*创建操作符:
create():完整创建1个被观察者对象(Observable)
just():快速创建1个被观察者对象(Observable)、直接发送,传入的事件。
fromArray():快速创建1个被观察者对象(Observable),直接发送,传入****的数组数据。
fromIterable():快速创建1个被观察者对象(Observable),直接发送,传入的集合List数据。
defer():直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件。
timer():快速创建1个被观察者对象(Observable),延迟指定时间后,发送1个数值0(Long类型)。
interval():快速创建1个被观察者对象(Observable),每隔指定时间 就发送 事件。
intervalRange():快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量。
range():快速创建1个被观察者对象(Observable),连续发送 1个事件序列,可指定范围。
rangeLong():类似于range(),区别在于该方法支持数据类型 = Long

/********************************************************/
*变换操作符:
Map():对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件。
FlatMap():将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。
ConcatMap():类似FlatMap()操作符,拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序。
Buffer():定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送。

/********************************************************/
*组合/合并操作符:
concat() / concatArray():组合多个被观察者一起发送数据,合并后 按发送顺序串行执行。
merge() / mergeArray():组合多个被观察者一起发送数据,合并后 按时间线并行执行。
concatDelayError() / mergeDelayError():冲突解决。
Zip():合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。
combineLatest():当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
combineLatestDelayError():作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述。
reduce():把被观察者需要发送的事件聚合成1个事件 & 发送
collect():将被观察者Observable发送的数据事件收集到一个数据结构里。
startWith() / startWithArray():在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者。
count():统计被观察者发送事件的数量。

/********************************************************/
*功能性操作符
subscribe():订阅,即连接观察者 & 被观察者.
subscribeOn(): 线程调度。
observeOn():线程调度。
delay():使得被观察者延迟一段时间再发送事件。
do():在某个事件的生命周期中调用。
onErrorReturn():遇到错误时,发送1个特殊事件 & 正常终止。
onErrorResumeNext():遇到错误时,发送1个新的Observable。
onExceptionResumeNext():遇到错误时,发送1个新的Observable
retry():重试,即当出现错误时,让被观察者(Observable)重新发射数据。
retryUntil():出现错误后,判断是否需要重新发送数据。
retryWhen():遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件。
repeat():无条件地、重复发送 被观察者事件。
repeatWhen():有条件地、重复发送 被观察者事件。

/********************************************************/
*过滤操作符
Filter():过滤 特定条件的事件。
ofType():过滤 特定数据类型的数据。
skip() / skipLast():跳过某个事件。
distinct() / distinctUntilChanged():过滤事件序列中重复的事件 / 连续重复的事件。
take():指定观察者最多能接收到的事件数量。
takeLast():指定观察者只能接收到被观察者发送的最后几个事件。
throttleFirst()/ throttleLast():在某段时间内,只发送该段时间内第1次事件 / 最后1次事件。
Sample():在某段时间内,只发送该段时间内最新(最后)1次事件。
throttleWithTimeout () / debounce():发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据。
firstElement() / lastElement():仅选取第1个元素 / 最后一个元素。
elementAt():指定接收某个元素(通过 索引值 确定)。
elementAtOrError():在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常。

/********************************************************/
*过滤操作符
all():判断发送的每项数据是否都满足 设置的函数条件。
takeWhile():判断发送的每项数据是否满足 设置函数条件。
skipWhile():判断发送的每项数据是否满足 设置函数条件。
takeUntil():执行到某个条件时,停止发送事件。
takeUntil():执行到某个条件时,停止发送事件。
skipUntil():等到 skipUntil() 传入的Observable开始发送数据,(原始)第1个Observable的数据才开始发送数据。
SequenceEqual():判定两个Observables需要发送的数据是否相同。
contains():判断发送的数据中是否包含指定数据。
isEmpty():判断发送的数据是否为空。
amb():当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃。
defaultIfEmpty():在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值.

二、RxJava + Retrofit

RxJava 和 Retrofit何以结合使用,Retrofit提供了支持 RxJava 的转换接口。使用就不详解了,直接贴代码,简单粗暴:

/*******
 * Retrofit基于OkHttp,通过注解提供网络请求、传参等功能,方便灵活。
 * 同时提供网络响应数据转换。
 *
 * *********/
/**
 * Created by Ethan Lee on 19/7/20.
 */
public class RetrofitInstance {
    private volatile static  Retrofit retrofit;
    private static final int READ_TIME_OUT = 20;
    private static final int CONNECT_TIME_OUT = 20;
    private static final int WRITE_TIME_OUT = 20;
    private static final String CACHE_NAME = "cache";
    private static boolean DEBUG_MODE = true;

    public static Retrofit getRetrofitInstance() {
        final String  baseUrl = "" + "/";
        if (retrofit == null) {
            synchronized (NetRequestTool.class) {
                if (retrofit == null) {
                    HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
                    if (DEBUG_MODE) {
                        loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                    } else {
                        loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
                    }
                    File cacheFile = new File(BaseApplication.getAppContext().getExternalCacheDir(), CACHE_NAME);
                    Cache cache = new Cache(cacheFile,1024 * 1024 * 20);
                    OkHttpClient.Builder builder = new OkHttpClient.Builder()
                            .cache(cache)
                            .addInterceptor(loggingInterceptor)
                            .addInterceptor(cacheControlInterceptor)
                            .connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)
                            .readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)
                            .writeTimeout(WRITE_TIME_OUT, TimeUnit.SECONDS)
                            .retryOnConnectionFailure(true);
                    retrofit = new Retrofit.Builder()
                            .addConverterFactory(ScalarsConverterFactory.create())
                            .addConverterFactory(GsonConverterFactory.create())
                            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                            .baseUrl(baseUrl)
                            .client(builder.build())
                            .build();
                }
            }
        }
        return retrofit;
    }

    private static final Interceptor cacheControlInterceptor = new Interceptor() {   //缓存设置
        @Override
        public Response intercept(Chain chain) throws IOException {
            Request request = chain.request();
            if (!InternetStatus.isNetAvailable(BaseApplication.getAppContext())) {
                request = request.newBuilder().cacheControl(CacheControl.FORCE_CACHE).build();
            }
            Response originalResponse = chain.proceed(request);
            if (InternetStatus.isNetAvailable(BaseApplication.getAppContext())) {
                String cacheControl = request.cacheControl().toString();
                return originalResponse.newBuilder()
                        .header("Cache-Control", cacheControl)
                        .removeHeader("Pragma")
                        .build();
            } else {
                int maxStale = 60 * 60 * 24 * 7;
                return originalResponse.newBuilder()
                        .header("Cache-Control", "public, only-if-cached, max-stale=" + maxStale)
                        .removeHeader("Pragma")
                        .build();
            }
        }
    };
}
/**
 * Created by Ethan Lee on 19/7/20.
 */
public interface RetrofitRequestInterface {

    @GET
    Call<ResponseBody> getRequest(@Url String url);

    @GET
    Call<ResponseBody> getRequestWithToken(@Header("token") String token, @Url String url);

    @POST
    @Headers("content-type: application/json")
    Call<ResponseBody> postRequest(@Url String url, @Body String param);

    @POST
    @Headers("content-type: application/json")
    Call<ResponseBody> postRequestWithToken(@Header("token") String token, @Url String url, @Body String param);

    @Multipart
    @POST
    Call<ResponseBody> uploadMemberIcon(@Header("token") String token, @Url String url, @PartMap Map<String, RequestBody> params, @Part MultipartBody.Part file);

    @POST
    @Multipart
    Call<ResponseBody> uploadECGraph(@Header("token") String token, @Url String url, @Part MultipartBody.Part file);

    /************ RXJava与 retrofit结合***********/
    @GET
    Observable<JSONObject> getRequestWithTokenByRXJava(@Header("token") String token , @Url String url);

    @POST
    @Headers("content-type: application/json")
    Observable<JSONObject> postRequestWithResultByRXJava(@Header("token") String token , @Url String url , @Body String params);
}

Demo在 : RxJava + Retrofit

相关文章

网友评论

      本文标题:RxJava操作符分类汇总+Rxjava与Retrofit搭配使

      本文链接:https://www.haomeiwen.com/subject/styypltx.html