美文网首页Android架构师
RxJava的并发实现

RxJava的并发实现

作者: 我爱田Hebe | 来源:发表于2021-11-03 22:11 被阅读0次

    RxJava的并发实现
    我们在开发App过程中,常常遇见这种需求,例如首页,仅一个界面就要请求3个甚至更多的接口,更变态的是这些接口必须按顺序请求,来以此展示返回结果,那么这样我们就无法用普通的并发去同时请求接口了,因为我们无法预知各个接口的请求完成时间,普通的也是最简单的办法就是依次请求接口了,A接口请求完成->B接口请求完成->C接口...简单粗暴有木有?并且在加载效率上(接口请求时间)会差很多,那么有没有更优雅的办法去解决这种需求呢?那必须有,利用RxJava的Observable.zip方法即可实现并发请求!

    假如ApiService中有两个接口:

    @GET("test1")
    Observable<HttpResult<TestModel1>> test1(@QueryMap HashMap<String, String> options);
    
    @GET("test2")
    Observable<HttpResult<TestModel2>> test2(@QueryMap HashMap<String, String> options);
    

    HttpResult为自定义数据结构:

    public class HttpResult<T> {
    
        public int status;
    
        public String msg;
    
        public T data;
    
    }
    

    TestModel1和TestModel2则分别为两个返回的数据结构!

    接口封装后的请求方法: test1:

        Observable o1 = Observable.create((ObservableOnSubscribe<TestModel1>) emitter ->
                //接口请求
                ApiUtil.getInstance()
                        .getApiService()
                        .test1()
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<HttpResult<TestModel1>>() {
    
                            @Override
                            public void onSubscribe(Disposable d) {
    
                            }
    
                            @Override
                            public void onNext(HttpResult<TestModel1> httpResult) {
                                emitter.onNext(httpResult.data);
                                emitter.onComplete();
                            }
    
                            @Override
                            public void onError(Throwable e) {
                                emitter.onNext(null);
                                emitter.onComplete();
                            }
    
                            @Override
                            public void onComplete() {
    
                            }
                        }));
    

    注意: ObservableOnSubscribe的参数是o1 中emitter要传递的参数类型,也就是你接口得到的数据类型:TestModel1!

    test2:

     Observable o2 = Observable.create((ObservableOnSubscribe<TestModel2>) emitter ->
                //接口请求
                ApiUtil.getInstance()
                        .getApiService()
                        .test2()
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<HttpResult<TestModel2>>() {
    
                            @Override
                            public void onSubscribe(Disposable d) {
    
                            }
    
                            @Override
                            public void onNext(HttpResult<TestModel2> httpResult) {
                                emitter.onNext(httpResult.data);
                                emitter.onComplete();
                            }
    
                            @Override
                            public void onError(Throwable e) {
                                emitter.onNext(null);
                                emitter.onComplete();
                            }
    
                            @Override
                            public void onComplete() {
    
                            }
                        }));
    

    两个接口请求,得到两个Observable:o1和o2!

    合并:

       Observable.zip(o1, o2, new BiFunction<Object, Object, Object>() {
            @Override
            public Object apply(Object o, Object o2) throws Exception {
                TestModel1 t1 = (TestModel1) o;//o1得到的结果
                TestModel2 t2 = (TestModel2) o2;//o2得到的结果
                FinalData f=new FinalData();//最终结果合并
                f.t1=t1;
                f.t2=t2;
                return f;
            }
        }).subscribeOn(Schedulers.io()).subscribe(o -> {
                FinalData f=(FinalData)o;//获取最终结果
                //处理数据...
        });
    

    注意: BiFunction中的3个Obj参数,前两个对应接口返回数据类型,最后一个对应apply方法返回的数据类型(最终结果)!

    如果是3个或以上接口,那么合并时可以根据接口数量使用Function3,Function4...

       Observable.zip(o1, o2,o3, new Function3<Object, Object, Object,Object>() {
            @Override
            public Object apply(Object o, Object o2,Object o3) throws Exception {
    
            }
        }).subscribeOn(Schedulers.io()).subscribe(o -> {
    
        });
    

    除了zip操作符,rxjava还提供了concat,merge,join等其它合并操作符,但它们又各有不同,有兴趣的可以去多了解一下!

    相关文章

      网友评论

        本文标题:RxJava的并发实现

        本文链接:https://www.haomeiwen.com/subject/aazczltx.html