美文网首页
RxJava操作符分类汇总+Rxjava与Retrofit搭配使

RxJava操作符分类汇总+Rxjava与Retrofit搭配使

作者: 碧云天EthanLee | 来源:发表于2021-07-14 15:43 被阅读0次
    概述

    Rxjava有海量的操作符,很多操作符还有多个重载方法。所以这篇笔记就基于RxJava2尽可能地分类汇总一下,方便以后需要使用的时候查看一下。另外,写了一个RxJava与Retrofit搭配使用的例子,也一并码住。操作符按功能分主要六大类:创建操作符、变换操作符、组合/合并操作符、功能性操作符、过滤操作符、条件/布尔操作符

    一、分类汇总

    /********************************************************/
    *创建操作符:
    create():完整创建1个被观察者对象(Observable)
    just():快速创建1个被观察者对象(Observable)、直接发送,传入的事件。
    fromArray():快速创建1个被观察者对象(Observable),直接发送,传入****的数组数据。
    fromIterable():快速创建1个被观察者对象(Observable),直接发送,传入的集合List数据。
    defer():直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件。
    timer():快速创建1个被观察者对象(Observable),延迟指定时间后,发送1个数值0(Long类型)。
    interval():快速创建1个被观察者对象(Observable),每隔指定时间 就发送 事件。
    intervalRange():快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量。
    range():快速创建1个被观察者对象(Observable),连续发送 1个事件序列,可指定范围。
    rangeLong():类似于range(),区别在于该方法支持数据类型 = Long

    /********************************************************/
    *变换操作符:
    Map():对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件。
    FlatMap():将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。
    ConcatMap():类似FlatMap()操作符,拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序。
    Buffer():定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送。

    /********************************************************/
    *组合/合并操作符:
    concat() / concatArray():组合多个被观察者一起发送数据,合并后 按发送顺序串行执行。
    merge() / mergeArray():组合多个被观察者一起发送数据,合并后 按时间线并行执行。
    concatDelayError() / mergeDelayError():冲突解决。
    Zip():合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。
    combineLatest():当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
    combineLatestDelayError():作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述。
    reduce():把被观察者需要发送的事件聚合成1个事件 & 发送
    collect():将被观察者Observable发送的数据事件收集到一个数据结构里。
    startWith() / startWithArray():在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者。
    count():统计被观察者发送事件的数量。

    /********************************************************/
    *功能性操作符
    subscribe():订阅,即连接观察者 & 被观察者.
    subscribeOn(): 线程调度。
    observeOn():线程调度。
    delay():使得被观察者延迟一段时间再发送事件。
    do():在某个事件的生命周期中调用。
    onErrorReturn():遇到错误时,发送1个特殊事件 & 正常终止。
    onErrorResumeNext():遇到错误时,发送1个新的Observable。
    onExceptionResumeNext():遇到错误时,发送1个新的Observable
    retry():重试,即当出现错误时,让被观察者(Observable)重新发射数据。
    retryUntil():出现错误后,判断是否需要重新发送数据。
    retryWhen():遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件。
    repeat():无条件地、重复发送 被观察者事件。
    repeatWhen():有条件地、重复发送 被观察者事件。

    /********************************************************/
    *过滤操作符
    Filter():过滤 特定条件的事件。
    ofType():过滤 特定数据类型的数据。
    skip() / skipLast():跳过某个事件。
    distinct() / distinctUntilChanged():过滤事件序列中重复的事件 / 连续重复的事件。
    take():指定观察者最多能接收到的事件数量。
    takeLast():指定观察者只能接收到被观察者发送的最后几个事件。
    throttleFirst()/ throttleLast():在某段时间内,只发送该段时间内第1次事件 / 最后1次事件。
    Sample():在某段时间内,只发送该段时间内最新(最后)1次事件。
    throttleWithTimeout () / debounce():发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据。
    firstElement() / lastElement():仅选取第1个元素 / 最后一个元素。
    elementAt():指定接收某个元素(通过 索引值 确定)。
    elementAtOrError():在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常。

    /********************************************************/
    *过滤操作符
    all():判断发送的每项数据是否都满足 设置的函数条件。
    takeWhile():判断发送的每项数据是否满足 设置函数条件。
    skipWhile():判断发送的每项数据是否满足 设置函数条件。
    takeUntil():执行到某个条件时,停止发送事件。
    takeUntil():执行到某个条件时,停止发送事件。
    skipUntil():等到 skipUntil() 传入的Observable开始发送数据,(原始)第1个Observable的数据才开始发送数据。
    SequenceEqual():判定两个Observables需要发送的数据是否相同。
    contains():判断发送的数据中是否包含指定数据。
    isEmpty():判断发送的数据是否为空。
    amb():当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃。
    defaultIfEmpty():在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值.

    二、RxJava + Retrofit

    RxJava 和 Retrofit何以结合使用,Retrofit提供了支持 RxJava 的转换接口。使用就不详解了,直接贴代码,简单粗暴:

    /*******
     * Retrofit基于OkHttp,通过注解提供网络请求、传参等功能,方便灵活。
     * 同时提供网络响应数据转换。
     *
     * *********/
    /**
     * Created by Ethan Lee on 19/7/20.
     */
    public class RetrofitInstance {
        private volatile static  Retrofit retrofit;
        private static final int READ_TIME_OUT = 20;
        private static final int CONNECT_TIME_OUT = 20;
        private static final int WRITE_TIME_OUT = 20;
        private static final String CACHE_NAME = "cache";
        private static boolean DEBUG_MODE = true;
    
        public static Retrofit getRetrofitInstance() {
            final String  baseUrl = "" + "/";
            if (retrofit == null) {
                synchronized (NetRequestTool.class) {
                    if (retrofit == null) {
                        HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
                        if (DEBUG_MODE) {
                            loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                        } else {
                            loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
                        }
                        File cacheFile = new File(BaseApplication.getAppContext().getExternalCacheDir(), CACHE_NAME);
                        Cache cache = new Cache(cacheFile,1024 * 1024 * 20);
                        OkHttpClient.Builder builder = new OkHttpClient.Builder()
                                .cache(cache)
                                .addInterceptor(loggingInterceptor)
                                .addInterceptor(cacheControlInterceptor)
                                .connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)
                                .readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)
                                .writeTimeout(WRITE_TIME_OUT, TimeUnit.SECONDS)
                                .retryOnConnectionFailure(true);
                        retrofit = new Retrofit.Builder()
                                .addConverterFactory(ScalarsConverterFactory.create())
                                .addConverterFactory(GsonConverterFactory.create())
                                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                                .baseUrl(baseUrl)
                                .client(builder.build())
                                .build();
                    }
                }
            }
            return retrofit;
        }
    
        private static final Interceptor cacheControlInterceptor = new Interceptor() {   //缓存设置
            @Override
            public Response intercept(Chain chain) throws IOException {
                Request request = chain.request();
                if (!InternetStatus.isNetAvailable(BaseApplication.getAppContext())) {
                    request = request.newBuilder().cacheControl(CacheControl.FORCE_CACHE).build();
                }
                Response originalResponse = chain.proceed(request);
                if (InternetStatus.isNetAvailable(BaseApplication.getAppContext())) {
                    String cacheControl = request.cacheControl().toString();
                    return originalResponse.newBuilder()
                            .header("Cache-Control", cacheControl)
                            .removeHeader("Pragma")
                            .build();
                } else {
                    int maxStale = 60 * 60 * 24 * 7;
                    return originalResponse.newBuilder()
                            .header("Cache-Control", "public, only-if-cached, max-stale=" + maxStale)
                            .removeHeader("Pragma")
                            .build();
                }
            }
        };
    }
    
    /**
     * Created by Ethan Lee on 19/7/20.
     */
    public interface RetrofitRequestInterface {
    
        @GET
        Call<ResponseBody> getRequest(@Url String url);
    
        @GET
        Call<ResponseBody> getRequestWithToken(@Header("token") String token, @Url String url);
    
        @POST
        @Headers("content-type: application/json")
        Call<ResponseBody> postRequest(@Url String url, @Body String param);
    
        @POST
        @Headers("content-type: application/json")
        Call<ResponseBody> postRequestWithToken(@Header("token") String token, @Url String url, @Body String param);
    
        @Multipart
        @POST
        Call<ResponseBody> uploadMemberIcon(@Header("token") String token, @Url String url, @PartMap Map<String, RequestBody> params, @Part MultipartBody.Part file);
    
        @POST
        @Multipart
        Call<ResponseBody> uploadECGraph(@Header("token") String token, @Url String url, @Part MultipartBody.Part file);
    
        /************ RXJava与 retrofit结合***********/
        @GET
        Observable<JSONObject> getRequestWithTokenByRXJava(@Header("token") String token , @Url String url);
    
        @POST
        @Headers("content-type: application/json")
        Observable<JSONObject> postRequestWithResultByRXJava(@Header("token") String token , @Url String url , @Body String params);
    }
    

    Demo在 : RxJava + Retrofit

    相关文章

      网友评论

          本文标题:RxJava操作符分类汇总+Rxjava与Retrofit搭配使

          本文链接:https://www.haomeiwen.com/subject/styypltx.html