美文网首页
RxJava使用笔记

RxJava使用笔记

作者: Leo_o | 来源:发表于2018-04-15 13:23 被阅读0次

    一、RxJava操作符

    1、创建操作符

    • create、just、from
    • interval:创建一个按固定时间间隔发射整数序列的Flowable,相当于定时器
    • range:创建发射指定范围的整数序列的Flowable,可替代for循环
    • repeat:创建一个N次重复发射特定数据的Flowable

    2、变换操作符

    • map:通过指定一个Function对象,转换成一个新的对象并发射
    • flaMap:将Flowable发射的数据集合变换为Flowable集合,然后将这些Flowable发射的数据平坦化的放进一个单独的Flowable
    • cast:强制将Flowable发射的所有数据转换为指定类型
    • concatMap:与flatMap一致,解决了flatMap的交叉问题
    • flatMapIterable:可将数据包装成Iterable
    • buffer:将源Flowable变换为一个新的Flowable,这个新的Flowable每次发射一组列表值
    • groupBy:分组元素

    3、过滤操作符

    • filter:对源Flowable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者
    • elementAt:返回指定位置的数据 elementAtOrDefault(int,T)可以允许默认值
    • distinct():去重 distinctUntilChanged去掉连续重复的数据
    • skip:过滤前N项 take:取前N项
    • ignoreElements:忽略所有源Flowable产生的结果,只把onCompleted和onError事件通知给订阅者
    • throttleFirst:定期发射这个时间段里源Flowable发射的第一个数据,throttleFirst默认在computation调度器上执行
    • throttleWithTimeOut:通过时间来限流,发射时间间隔小于指定时间的数据会被过滤掉

    4、组合操作符

    • startWith:在源Flowable发射的数据前面插上一些数据
    • merge:将多个Flowable合并到一个Flowable中进行发射(合并数据可能交错)
    • concat:合并发射(有序发射)
    • zip:合并数据,根据指定的函数变换它们,并发射一个新值
    • combineLastest:如果其中的一个Flowable还有数据没有发射,combineLastest将两个Flowable最新发射的数据组合在一起
    Flowable<Integer> flowable1 = Flowable.just(1, 2, 3);
                    Flowable<String> flowable2 = Flowable.just("a", "b", "c");
                    Flowable.combineLatest(flowable1, flowable2, new BiFunction<Integer, String, String>() {
                        @Override
                        public String apply(Integer integer, String s) throws Exception {
                            return integer + s;
                        }
                    }).subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d("Tag", "combineLatest:" + s);
                        }
                    });
    输出:3a 3b 3c
    

    5、辅助操作符

    • delay:让原始Flowable在发射每项数据之前都暂停一段指定的时间段
    • Do: 新文档 2018-04-15_1.jpg
    • subscribeOn:指定自身在哪个线程上运行
    • observeOn:指定发射出的数据在哪个线程上运行(一般情况在主线程)
    • timeout:如果超时以一个onError终止这个Flowable,或者执行一个备用的Flowable
    Flowable.create(new FlowableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(FlowableEmitter<Integer> sub) throws Exception {
                            for (int i = 0; i < 4; i++) {
                                try {
                                    Thread.sleep(i * 100);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                sub.onNext(i);
                            }
                            sub.onComplete();
                        }
                    }, BackpressureStrategy.BUFFER).timeout(200, TimeUnit.MILLISECONDS, Flowable.just(10, 11))
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.d("tag", "timeout:" + integer);
                                }
                            });
    输出:0 1 2 10 11
    

    6、错误处理操作符

    • catch:拦截原始Flowable的onError onErrorReturn onErrorResumeNext onExceptionResumeNext
    Flowable.create(new FlowableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                            for (int i = 0; i < 5; i++) {
                                if (i > 2) {
                                    e.onError(new Throwable("Throwable"));
                                }
                                e.onNext(i);
                            }
                            e.onComplete();
                        }
                    }, BackpressureStrategy.BUFFER).onErrorReturn(new Function<Throwable, Integer>() {
                        @Override
                        public Integer apply(Throwable throwable) throws Exception {
                            return 6;
                        }
                    }).subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            s.request(Integer.MAX_VALUE);
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d("tag", "onNext:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.d("tag", "onError:" + t.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d("tag", "onComplete");
                        }
                    });
    输出:onNext:0 1 2 6 onComplete
    
    • retry:重试 传递最新的onError给观察者
    Flowable.create(new FlowableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                            for (int i = 0; i < 5; i++) {
                                if (i == 1) {
                                    e.onError(new Throwable("Throwable"));
                                } else {
                                    e.onNext(i);
                                }
                            }
                            e.onComplete();
                        }
                    }, BackpressureStrategy.BUFFER).retry(2).subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            s.request(Integer.MAX_VALUE);
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d("tag", "onNext:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.d("tag", "onError:" + t.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d("tag", "onComplete");
                        }
                    });
    上面代码重新订阅次数为2,i=0调用注释1,重试2次同样调用1,这样一共调用3次onNext方法最后才会调用onError方法
    输出:0 0 0 onError:Throwable 
    

    7、条件操作符和布尔操作符

    1、布尔操作符
    • all:根据一个函数对源发射的所有数据进行判断,最终返回的结果就是这个判断的结果
    • contains:判断源发射的数据是否包含某一个数据 包含返回true
    • isEmpty:判断源是否发射过数据 没发射返回true
    2、条件操作符
    • amb:对于给定2个或多个Flowable,它只发射首先发射数据或通知的那个Flowable的所有数据
    Flowable.ambArray(Flowable.just(1, 2, 3).delay(2, TimeUnit.SECONDS), Flowable.just(4, 5, 6))
                            .subscribe(new Consumer<Integer>() {
                                @Override
                                public void accept(Integer integer) throws Exception {
                                    Log.d("tag", "amb:" + integer);
                                }
                            });
    输出:4 5 6
    
    • defaultIfEmpty:发射来自原始Flowable的数据,如果源没有发射数据,就发射一个默认数据

    8、转换操作符

    • toList:将发射的数据转换成list
    Flowable.just(1, 2, 3).toList().subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> integers) throws Exception {
                            for (int i : integers) {
                                Log.d("tag", "toList:" + i);
                            }
                        }
                    });
    
    • toSortedList:对转换后的list排序,默认升序
    Flowable.just(3, 1, 2).toSortedList().subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> integers) throws Exception {
                            for (int i : integers) {
                                Log.d("tag", "toSortedList:" + i);
                            }
                        }
                    });
    输出:1 2 3
    
    • toMap:转换成Map(默认HashMap)

    二、RxJava使用场景,结合Okhttp、Retrofit

    1、配置build.gradle

    dependencies {
       ...
        implementation 'io.reactivex.rxjava2:rxjava:2.1.5'
        implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
        implementation 'com.squareup.retrofit2:retrofit:2.3.0'
        implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
        implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
    }
    

    2、网络请求接口

    public interface LastFmApiService {
    
        String BASE_PARAMETERS_ARTIST = "?method=artist.getinfo&api_key=fdb3a51437d4281d4d64964d333531d4&format=json";
    
        @GET(BASE_PARAMETERS_ARTIST)
        Flowable<ArtistInfo> getArtistInfo(@Query("artist") String artist);
    }
    

    3、网络请求

    public class RetrofitUtils {
    
        private static final String BASE_KU_GOU_URL = "http://lyrics.kugou.com/";
        private static final String BASE_LASTFM_URL = "http://ws.audioscrobbler.com/2.0/";
        private Retrofit retrofit;
    
        private RetrofitUtils(boolean is) {
            OkHttpClient builder = new OkHttpClient.Builder()
                    .addInterceptor(new LoggingInterceptor())
                    .connectTimeout(15, TimeUnit.SECONDS)
                    .readTimeout(15, TimeUnit.SECONDS)
                    // 失败重试
                    .retryOnConnectionFailure(true)
                    //.sslSocketFactory(SSLSocketClient.setCertificates())
                    //.hostnameVerifier(SSLSocketClient.getHostnameVerifier())
                    .build();
            /*
             * StringConverterFactory和GsonConverterFactory不能同时使用
             * 谁在前返回谁的类型(坑)
             * MapConverterFactory和StringConverterFactory可同时使用
             *
             */
    
            retrofit = new Retrofit.Builder()
                    .client(builder)
                    .baseUrl(is ? BASE_KU_GOU_URL : BASE_LASTFM_URL)
                    //.addConverterFactory(MapConverterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addConverterFactory(StringConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
        }
    }
    

    相关文章

      网友评论

          本文标题:RxJava使用笔记

          本文链接:https://www.haomeiwen.com/subject/nurdkftx.html