美文网首页
Rxjava2基本的操作符just,fromArray,map,

Rxjava2基本的操作符just,fromArray,map,

作者: 全球顶尖伪极客 | 来源:发表于2018-08-09 17:50 被阅读0次

这可能是最好的RxJava 2.x 教程(完结版)
Android RxJava 2 的用法 just 、from、map、subscribe、flatmap、Flowable、Function、Consumer ...
RxJava2 使用解析——常见的使用场景
大佬们,一波RxJava 3.0来袭,请做好准备~

RxJava2.x依赖的几个接口:

  • Publisher:被观察者,发出一系列的事件
  • Subscriber:观察者, 负责处理这些事件.
  • Subscription
  • Processor
    image.png
package org.reactivestreams;

public interface Publisher<T> {

public interface Subscriber<T> {

public interface Subscription {

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

  • Observable(被观察者)/Observer(观察者)
  • Flowable(被观察者)/Subscriber(观察者)
  • Observeable用于订阅Observer,是不支持背压的.
  • Flowable用于订阅Subscriber,是支持背压(Backpressure)的。
public abstract class Observable<T> implements ObservableSource<T> {}
public abstract class Flowable<T> implements Publisher<T> {...}

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable var1);

    void onNext(@NonNull T var1);

    void onError(@NonNull Throwable var1);

    void onComplete();
}

public interface Subscriber<T> {
   
    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

借用的图片:https://www.jianshu.com/p/0cd258eecf60

image.png

实用操作符

1、ObserveOn

指定观察者的线程,例如在Android访问网络后,数据需要主线程消费,那么将观察者的线程切换到主线就需要ObserveOn操作符。每次指定一次都会生效。

2、subscribeOn

指定被观察者的线程,即数据源发生的线程。例如在Android访问网络时,需要将线程切换到子线程。多次指定只有第一次有效。

3、doOnEach

数据源(Observable)每发送一次数据,就调用一次。

4、doOnNext

数据源每次调用onNext() 之前都会先回调该方法。

5、doOnError

数据源每次调用onError() 之前会回调该方法。

6、doOnComplete

数据源每次调用onComplete() 之前会回调该方法

7、doOnSubscribe

数据源每次调用onSubscribe() 之后会回调该方法

8、doOnDispose

数据源每次调用dispose() 之后会回调该方法

背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException消耗内存过大只会OOM

创建 Obeservable的时候,传入ObservableOnSubscribe对象作为参数。当 Observable被订阅的时候,ObservableOnSubscribesubscribe()方法会自动被调用,事件序列就会依照设定依次触发,比如观察者 Observer将会被调用一次onNext().

采用 OkHttp3 配合 map, doOnNext , 线程切换做简单的网络请求
  • 1、通过 Observable.create() 方法,调用 OkHttp 网络请求;
  • 2、通过 map 操作符结合 Gson/JSONObject , 将 Response 转换为 bean/entity 类;
  • 3、通过 doOnNext() 方法,解析bean中的数据,并进行数据库存储等操作;
  • 4、调度线程,在子线程进行耗时操作任务,在主线程更新UI;
  • 5、通过 subscribe()根据请求成功或者失败来更新 UI

Concat

使用场景:

Concat 先读取缓存数据并展示UI再获取网络数据刷新UI。

  • 1、concat 可以做到不交错的发射两个甚至多个Observable的发射物;
  • 2、并且只有前一个 Observable 终止(onComplete)才会订阅下一个 Observable,也就是在操作符 concat 中,只有调用 onComplete 之后才会执行下一个Observable,如果缓存数据不为空,则直接读取缓存数据,而不读取网络数据。
  • 3、两个Observable的泛型应当保持一致

concatMap

concatMap作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序。

interval( rx2在用)、timer(过时)方式

间隔执行操作,默认在新线程,当需要更新UI则记得切换到UI线程。在Rxjavatimer操作符既可以延迟执行一段逻辑,也可以间隔执行一段逻辑

  • [注意] 但在RxJava 2.x已经过时了,现在用interval操作符来间隔执行.
  • timerinterval都默认执行在一个新线程上。
Observable observable =Observable.interval(1, TimeUnit.SECONDS);
Disposable  mDisposable = Flowable.interval(1, TimeUnit.SECONDS).doOnNext(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.i(TAG, "accept: doOnNext:" + aLong);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                mTextView.setText("心跳间隔任务" + aLong);
            }
        });

创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定2秒一次调用onNext()方法。即时通讯等需要轮训的任务在如今的 APP 中已是很常见,而RxJava 2.xinterval 操作符即可实现。

map方法

map操作符可以将一个 Observable 对象通过某种关系转换为另一个Observable 对象,传入的参数为Function类型。作用是对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化。

  • 1.map回调中,能直接拿到传进来得参数,并且返回一个值。处理完传进来的参数后发射给下一级。
  • 2.传进来得参数可以随意操作转换,参数返回值的类型由Function的泛型控制。
  • 3.Function泛型第一个是传进来的参数,第二个是返回值的类型

flatMap方法

作用等同于map方法。将传递进来的参数进行转换后返回。
区别在于返回值类型为Publisher或其实现类Flowableflowablepublisher的实现类,少写一个for循环,用了forArray()方法。 FlatMap将一个发送事件的上游Observable变换成多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。需要注意flatMap并不保证事件的顺序。
使用场景:
多个网络请求依次依赖,比如:

  • 1、注册用户前先通过接口A获取当前用户是否已注册,再通过接口B注册;

  • 2、注册后自动登录,先通过注册接口注册用户信息,注册成功后马上调用登录接口进行自动登录。

  • zip

分别从两个上游事件中各取出一个组合,一个事件只能被使用一次,顺序严格按照事件发送的顺序最终下游事件收到的是和上游事件最少的数目相同(必须两两配对,多余的舍弃)
使用场景:结合多个接口的数据再更新 UI。
需求:九宫格菜单+角标,两个接口返回,但是要一一对应上
实现方案GridView+zip。先返回菜单的list集合,再将这个集合跟角标集合结合,如果id相等,则给菜单集合的角标属性赋值并返回终极的数据源list。

Observable observable1 = Observable.create(new ObservableOnSubscribe<List<MenuEntity>>() {

            @Override
            public void subscribe(ObservableEmitter<List<MenuEntity>> observableEmitter) throws Exception {
                observableEmitter.onNext(parseMenuListFromLocal(FileTools.getFileStremFromRaw(ZipActivity.this, R.raw.menu_grid_json)));

            }

        });
        Observable observable2 = Observable.create(new ObservableOnSubscribe<List<MenuEntity>>() {

            @Override
            public void subscribe(ObservableEmitter<List<MenuEntity>> observableEmitter) throws Exception {
                observableEmitter.onNext(parseMenuNumListFromLocal(FileTools.getFileStremFromRaw(ZipActivity.this, R.raw.menu_grid_num_json)));

            }
        });
        Observable.zip(observable1, observable2, new BiFunction<List<MenuEntity>, List<MenuEntity>, List<MenuEntity>>() {

            @Override
            public List<MenuEntity> apply(List<MenuEntity> menuEntities, List<MenuEntity> menuEntities2) throws Exception {
                for (int i = 0; i < menuEntities.size(); i++) {
                    MenuEntity menuEntity = menuEntities.get(i);
                    for (int j = 0; j < menuEntities2.size(); j++) {
                        if (menuEntity.getId().equals(menuEntities2.get(j).getId())) {
                            menuEntity.setNum(menuEntities2.get(j).getNum());
                        }
                    }
                }
                return menuEntities;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<List<MenuEntity>>() {
                    @Override
                    public void accept(List<MenuEntity> menuEntities) throws Exception {
                        menuGridList.clear();
                        menuGridList.addAll(menuEntities);
                        menuGridviewAdapter.notifyDataSetChanged();
                    }

                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                });
 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                Log.i(TAG, TAG+"-subscribe: start emit data");

                observableEmitter.onNext("cc");
                observableEmitter.onNext("qq");
                observableEmitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                Log.i(TAG, TAG+"-onSubscribe: start subscribe ");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, TAG+"-onNext: " + s);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.i(TAG, TAG+"-onError: ");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, TAG+"-onComplete: ");
            }
        });
输出结果:
   RxJavaDemoActivity-onSubscribe: start subscribe //先执行订阅
   RxJavaDemoActivity-subscribe: start emit data
   RxJavaDemoActivity-onNext: cc
   RxJavaDemoActivity-onNext: qq
   RxJavaDemoActivity-onComplete: 

debounce

debounce操作符可以过滤掉发射频率过快的数据项,减少频繁的网络请求。
使用情景:
搜索时过滤掉输入过快不停的做网络请求,结合switchMap使用,避免后面的请求比前面的请求快先返回数据更新后,被之前的请求后返回来覆盖掉。

  • 1、输入框数据变化时就要进行网络请求,这样会产生大量的网络请求。这时候可以通过debounce进行处理
  • 2、点击一次按钮就进行一次网络请求.
RxView.clicks(mRxOperatorsBtn)
                .debounce(1,TimeUnit.SECONDS) // 过滤掉发射频率小于1秒的发射事件
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        clickBtn();
                    }
                });

just方法、fromArray方法

传入一个数组参数

Observable observable = Observable.just("hi");

传入若干个相同参数,和fromArray作用一样,使用just( ),将为你创建一个Observable订阅后并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext()just中传递的参数将直接在ObserveronNext()方法中接收到。

fromIterable()方式

fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item.

    @CheckReturnValue
    @SchedulerSupport("none")
    public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableFromIterable(source));
    }
...
  Observable.fromIterable(new ArrayList<String>()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG,s);
            }
   });

subscribe方法

传入一个Consumer对象,也就是被观察者,一串异步操作,最后在accept方法中显示UI.

 public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
        return flatMap(mapper, false, bufferSize(), bufferSize());
    }

subscribeOn方法

subscribeOn改变调用它之前代码的线程,传入的是一个Schedule对象,指一系列操作在哪个线程,用于指定Observeable被观察者在哪个线程中运行。一般为Schedule.io.

observeOn方法

observeOn改变调用它之后代码的线程,传入Schedule对象,回调在哪个线程中执行,一般在主线程执行,Observe观察者。指定的subscribe在哪个线程中执行。

retryWhen

我们在app里发起网络请求时,可能会因为各种问题导致失败。如何利用RxJava来实现出现错误后重试若干次,并且可以设定重试的时间间隔。
.retry()接收到.onError()事件后触发重订阅。

 /**
     *
     * @param client okhttpclient
     * @param method get post
     * @param urlArg url路径
     * @param params 参数
     * @param headers http头信息
     * @param maxRetryCount 网络异常重试次数
     * @param retryDelayTime 重试的延迟时间
     * @return
     */
    public static Observable<String> execute(final OkHttpClient client, final String method, final String urlArg, final Map<String,String> params, final Map<String,String> headers, final int maxRetryCount, final int retryDelayTime){

        Observable<String> httpObservable = Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                Log.d(TAG,"httpObservable》》》开始执行");

                String url = urlArg;
                Request.Builder builder = new Request.Builder();
                //builder.cacheControl(new CacheControl.Builder().noCache().build());

                if("GET".equalsIgnoreCase(method.toString())){
                    if(params!=null){
                        for (Map.Entry<String, String> entry : params.entrySet()) {
                            String name = entry.getKey();
                            String value = entry.getValue();
                            if(!TextUtils.isEmpty(name)){
                                name = name.trim();
                                if(url.contains("?")){
                                    url=url+"&"+name+"="+(TextUtils.isEmpty(value)?"":value.trim());
                                }else{
                                    url=url+"?"+name+"="+(TextUtils.isEmpty(value)?"":value.trim());
                                }
                            }
                        }
                    }
                }else{
                    FormBody.Builder formBuidler = new FormBody.Builder();
                    if(params!=null){
                        for (Map.Entry<String, String> entry : params.entrySet()) {
                            String name = entry.getKey();
                            String value = entry.getValue();
                            if(!TextUtils.isEmpty(name)){
                                name = name.trim();
                                //formBuidler.addEncoded(name,(TextUtils.isEmpty(value)?"":value.trim()));
                                formBuidler.add(name,(TextUtils.isEmpty(value)?"":value.trim()));
                            }
                        }
                    }
                    FormBody formBody = formBuidler.build();
                    builder.post(formBody);
                }

                builder.url(url);

                //添加自定义header
                if(headers!=null){
                    for (Map.Entry<String, String> entry : headers.entrySet()) {
                        String name = entry.getKey();
                        String value = entry.getValue();
                        if(!TextUtils.isEmpty(name) && !TextUtils.isEmpty(value)){
                            name = name.trim();
                            value = value.trim();
                            builder.header(name,value);
                        }
                    }
                }

                Request request = builder.build();

                Response response = client.newCall(request).execute();

                emitter.onNext(response);
                emitter.onComplete();
            }

        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            private int retryNum = 0;
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        if (++retryNum <= maxRetryCount && (throwable instanceof UnknownHostException || throwable instanceof IOException)) {
                            Log.d(TAG,"retry("+retryNum+")》》》"+throwable.getMessage());
                            return Observable.timer(retryDelayTime, TimeUnit.MILLISECONDS);
                        } else{
                            return Observable.error(throwable);
                        }
                    }
                });
            }
        }).flatMap(new Function<Response, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Response response) throws Exception {
                Log.d(TAG,"httpObservable》》》处理Response,读取body数据");
                if(response.isSuccessful()){
                    return Observable.just(response.body().string());
                }else{
                    Log.d(TAG,"请求失败,状态码:"+response.code());
                    return Observable.error(new RuntimeException("请求失败,状态码:"+response.code()));
                }
            }
        });

        return httpObservable;

    }

http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0206/3953.html

相关文章

网友评论

      本文标题:Rxjava2基本的操作符just,fromArray,map,

      本文链接:https://www.haomeiwen.com/subject/yvccbftx.html