美文网首页安卓
Rxjava2流的多线程并发

Rxjava2流的多线程并发

作者: SMSM | 来源:发表于2017-12-25 16:03 被阅读110次

    模拟多张图片并行上传业务,所有的都成功后,才允许点击保存

    --------单线程-----
    12-25 15:59:22.737 24988 25035 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://a
    12-25 15:59:23.164   932   950 I ActivityManager: Displayed com.pitaya.findviewbyiddemo/.SpringItemViewActivity: +1s985ms
    12-25 15:59:24.738 24988 25035 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://b
    12-25 15:59:24.738 24988 24988 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://a
    12-25 15:59:26.738 24988 25035 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://c
    12-25 15:59:26.738 24988 24988 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://b
    12-25 15:59:28.739 24988 24988 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://c
    
            Observable.just("token").flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String token) throws Exception {
                    return Observable.fromIterable(stringList).map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            return token + s;
                        }
                    }).observeOn(Schedulers.io()).map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(TAG, "testRxjavaToList request " + getCurrentName() + " " + s);
                            Thread.sleep(2000);
                            return "request done:" + s;
                        }
                    });
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "testRxjavaToList subscribe " + getCurrentName() + " " + s);
                        }
                    });
    
    
    -----支持流并行执行-----
    
    12-25 16:02:05.474 25972 26018 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://a
    12-25 16:02:05.477 25972 26019 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-3 tokenfile://b
    12-25 16:02:05.485 25972 26020 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-4 tokenfile://c
    12-25 16:02:05.893   932   950 I ActivityManager: Displayed com.pitaya.findviewbyiddemo/.SpringItemViewActivity: +1s972ms
    12-25 16:02:07.475 25972 25972 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://a
    12-25 16:02:07.478 25972 25972 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://b
    12-25 16:02:07.490 25972 25972 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://c
    
    
            //支持多线程并行,并行的最小单元是一条流。如何识别存在多条流的唯一方式是看是否存在flatMap(),并且FlatMap前有多少条流,就是多少能产生多少天新流,每个流
            //通过设置observeOn(Schedulers.io()可以并发执行
            Observable.just("token").flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String token) throws Exception {
                    return Observable.fromIterable(stringList).map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            return token + s;
                        }
                    }).flatMap(new Function<String, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(String s) throws Exception {
                            return Observable.just(s).observeOn(Schedulers.io()).map(new Function<String, String>() {
                                @Override
                                public String apply(String s) throws Exception {
                                    Log.d(TAG, "testRxjavaToList request " + getCurrentName() + " " + s);
                                    Thread.sleep(2000);
                                    return "request done:" + s;
                                }
                            });
                        }
                    });
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "testRxjavaToList subscribe " + getCurrentName() + " " + s);
                        }
                    });
    
    
    
    -----------代码 支持线程并发、支持每个流更新UI、支持最后合并toList给UI一个最终结果、支持流过程异常降级处理-------
    
    12-25 15:56:55.965   932  1661 I ActivityManager: Start proc 23904:com.pitaya.findviewbyiddemo/u0a71 for activity com.pitaya.findviewbyiddemo/.SpringItemViewActivity
    12-25 15:56:57.519 23904 23949 D SpringItemViewActivity: testRxjavaToList merge token RxCachedThreadScheduler-1
    12-25 15:56:57.521 23904 23949 D SpringItemViewActivity: testRxjavaToList merge token RxCachedThreadScheduler-1
    12-25 15:56:57.522 23904 23951 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://a
    12-25 15:56:57.523 23904 23949 D SpringItemViewActivity: testRxjavaToList merge token RxCachedThreadScheduler-1
    12-25 15:56:57.526 23904 23952 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-3 tokenfile://b
    12-25 15:56:57.526 23904 23953 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-4 tokenfile://c
    12-25 15:56:57.956   932   950 I ActivityManager: Displayed com.pitaya.findviewbyiddemo/.SpringItemViewActivity: +2s27ms
    12-25 15:56:59.523 23904 23904 D SpringItemViewActivity: testRxjavaToList apply map main request done:tokenfile://a
    12-25 15:56:59.530 23904 23904 D SpringItemViewActivity: testRxjavaToList apply map main request done:tokenfile://c
    12-25 15:56:59.536 23904 23904 D SpringItemViewActivity: testRxjavaToList apply map main  null 
    12-25 15:56:59.548 23904 23904 D SpringItemViewActivity: testRxjavaToList subscribe main [request done:tokenfile://a, request done:tokenfile://c,  null ]
    
    
            //线程切换、异常处理onErrorReturnItem 的最小执行单位都是,而不是流内部含有for循环的操作符,比如Just、From等。
    
            ArrayList<String> stringList = new ArrayList<>();
            stringList.add("file://a");
            stringList.add("file://b");
            stringList.add("file://c");
    
            Observable.just("token").flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String token) throws Exception {
                    return Observable.fromIterable(stringList).map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(TAG, "testRxjavaToList merge token " + getCurrentName());
                            return token + s;
                        }
                    }).flatMap(new Function<String, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(String s) throws Exception {
                            return Observable.just(s).observeOn(Schedulers.io()).map(new Function<String, String>() {
                                @Override
                                public String apply(String s) throws Exception {
                                    Log.d(TAG, "testRxjavaToList request " + getCurrentName() + " " + s);
                                    Thread.sleep(2000);
                                    if (s.contains("file://b")) {
                                        throw new NullPointerException("服务器异常");  //服务器异常
                                    }
                                    return "request done:" + s;
                                }
                            }).onErrorReturnItem(" null "); //当前流出错后,降级处理
                        }
                    }).observeOn(AndroidSchedulers.mainThread()).map(new Function<String, String>() {
                        @Override
                        public String apply(String s) throws Exception {
                            Log.d(TAG, "testRxjavaToList apply map " + getCurrentName() + " " + s);
                            Toast.makeText(getApplicationContext(), s, LENGTH_SHORT).show();//更新图标状态
                            return s;
                        }
                    }).observeOn(Schedulers.io());
                }
            }).toList().toObservable().subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> strings) throws Exception {
                    Log.d(TAG, "testRxjavaToList subscribe " + getCurrentName() + " " + strings);
                }
            });
    
    
    

    相关文章

      网友评论

        本文标题:Rxjava2流的多线程并发

        本文链接:https://www.haomeiwen.com/subject/ekblgxtx.html