美文网首页
RxJava错误处理与线程并发

RxJava错误处理与线程并发

作者: SMSM | 来源:发表于2017-12-20 11:00 被阅读162次
错误处理、线程并发

错误处理

  • Rxjava为链式调用,事件从源头流到尽头。
    一个源头链条中发生异常或者Observable.error(),都将中断当前流,直接跨过中间所有环节直到找到当前流中onErrorResumeNext()\或者接受者的onError()继续执行。
    just、from发送多个event本质还是在一条流中发射的。所以有异常后,同一条流上未发射的事件将永远不会被发射。

  • 流有父流、子流之分。如何创建多个条流呢?用如下结构just().flatmap(),FlatMap能够新开辟流,所以就存在了嵌套流,但是异常处理还是遵循上述原则。综上所述,异常发生在哪条流,就中断那条流。

RxJava线程并发

https://www.jianshu.com/p/3dcf462dca08
借助just().flatmap(),flatmap能够开辟新流的特性,对每个子流做线程切换,全局搜索如下,看完整代码

Observable.just(Integer.valueOf(s)).observeOn(Schedulers.io()).map(new Function<Object, Integer>() {}

12-20 10:24:06.712 4958-4988/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-2flatMap apply
12-20 10:24:06.717 4958-4988/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-2flatMap apply
12-20 10:24:06.723 4958-4989/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-4
12-20 10:24:06.750 4958-4988/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-2flatMap apply
12-20 10:24:06.751 4958-4990/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-3
12-20 10:24:06.751 4958-4988/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-2flatMap apply
12-20 10:24:06.751 4958-4987/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-1
12-20 10:24:06.759 4958-4991/com.pitaya.findviewbyiddemo E/MainActivity: RxCachedThreadScheduler-5
12-20 10:24:06.834 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 我牛逼1  1
12-20 10:24:06.834 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了1001
12-20 10:24:06.835 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了922
12-20 10:24:06.835 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了1003
12-20 10:24:06.835 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了1002
12-20 10:24:06.836 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了33
12-20 10:24:06.836 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 在 Consumer中抛出的异常  3
12-20 10:24:06.836 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了44
12-20 10:24:06.836 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了444
12-20 10:24:06.836 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 1000292348执行成功了4444
12-20 10:24:06.844 4958-4958/com.pitaya.findviewbyiddemo E/MainActivity: 我牛逼5 5
12-20 10:24:11.761 4958-4958/com.pitaya.findviewbyiddemo D/EventBus: No subscribers registered for event class com.pitaya.findviewbyiddemo.MainActivity$MessageEvent
12-20 10:24:11.763 4958-4958/com.pitaya.findviewbyiddemo D/EventBus: No subscribers registered for event class com.pitaya.findviewbyiddemo.MainActivity$MessageEvent


 //TODO 错误的处理
       Observable.just("1", "2", "3").map(new Function<String, Integer>() {
           @Override
           public Integer apply(@NonNull String s) throws Exception {
               throw new NullPointerException("我牛逼1 ");
           }
       }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(integer -> {
           Log.e(TAG, MainActivity.this.hashCode() + "执行成功了" + integer);
           mNameActivityUtil.mEventBtn.setText("被修改了");
//            throw new NullPointerException("我牛逼 ");  .onErrorReturnItem(55)
       }, throwable -> {
           Log.e(TAG, throwable.getMessage() + " 1");
       });

       Observable.just("1", "2","22", "3").flatMap(new Function<String, ObservableSource<?>>() {
           @Override
           public ObservableSource<Integer> apply(@NonNull String s) throws Exception {
               Log.e(TAG, Thread.currentThread().getName() + "flatMap apply");
               return Observable.just(Integer.valueOf(s)).observeOn(Schedulers.io()).map(new Function<Object, Integer>() {
                   @Override
                   public Integer apply(@NonNull Object o) throws Exception {
                       Log.e(TAG, Thread.currentThread().getName());

                       if (22 == (Integer) o) {
                           throw new NullPointerException("我牛逼2 ");
                       }

                       return 1000 + (Integer) o;
                   }
               }).onErrorReturnItem(922);
           }
       }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
           @Override
           public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
               return Observable.just(999);
           }
       }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(integer -> {
           Log.e(TAG, MainActivity.this.hashCode() + "执行成功了" + integer);
           mNameActivityUtil.mEventBtn.setText("被修改了");
       }, throwable -> {
           Log.e(TAG, throwable.getMessage() + " 2");
       });

       Observable.just("1", "2", "3").map(new Function<String, Integer>() {
           @Override
           public Integer apply(@NonNull String s) throws Exception {
               throw new NullPointerException("我牛逼3 ");
           }
       }).map(new Function<Integer, Integer>() {

           @Override
           public Integer apply(@NonNull Integer integer) throws Exception {
               return 3;
           }
       }).onErrorReturnItem(33).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(integer -> {
           Log.e(TAG, MainActivity.this.hashCode() + "执行成功了" + integer);
           mNameActivityUtil.mEventBtn.setText("被修改了");
           throw new NullPointerException("在 Consumer中抛出的异常 ");
       }, throwable -> {
           Log.e(TAG, throwable.getMessage() + " 3");
       });

       Observable.just("1").map(new Function<String, Integer>() {
           @Override
           public Integer apply(@NonNull String s) throws Exception {
               throw new NullPointerException("我牛逼4 ");
           }
       }).map(new Function<Integer, Integer>() {

           @Override
           public Integer apply(@NonNull Integer integer) throws Exception {
               return 666;
           }
       }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
           @Override
           public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
               return Observable.just(44, 444, 4444);
           }
       }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(integer -> {
           Log.e(TAG, MainActivity.this.hashCode() + "执行成功了" + integer);
           mNameActivityUtil.mEventBtn.setText("被修改了");
       }, throwable -> {
           Log.e(TAG, throwable.getMessage() + " 4");
       });

       Observable.just("1", "2", "3").flatMap(new Function<String, ObservableSource<Integer>>() {
           @Override
           public ObservableSource<Integer> apply(@NonNull String s) throws Exception {
               return Observable.error(new NullPointerException("我牛逼5"));
           }
       }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(integer -> {
           Log.e(TAG, MainActivity.this.hashCode() + "执行成功了" + integer);
           mNameActivityUtil.mEventBtn.setText("被修改了");
       }, throwable -> {
           Log.e(TAG, throwable.getMessage() + " 5");
       });




相关文章

  • RxJava错误处理与线程并发

    错误处理 Rxjava为链式调用,事件从源头流到尽头。一个源头链条中发生异常或者Observable.error(...

  • RxJava并发parallel的使用

    概述 本文不描述RxJava是什么,以及如何使用的,重点讨论如何使用RxJava实现并发。即: 区分线程切换和并发...

  • react native - 收藏集 - 掘金

    浅谈 RxJava 中的线程管理 - Android - 掘金上一篇文章研究了一下RxJava中的多线程并发问题,...

  • 技术总结

    JDK内存模型 NIO模型 异步Servet RxJava 并发,同步,线程池,ThreadLocal,Calla...

  • Android 高级面试-3:语言相关

    主要内容:Kotlin, Java, RxJava, 多线程/并发, 集合 1、Java 相关 1.1 缓存相关 ...

  • 探索 Android 多线程 - 1 AsyncTask

    探索 Android 多线程 - 1 AsyncTask 前言 并发(1) -- 线程与线程池并发(2) -- s...

  • iOS 多线程技术总结

    概览 进程与线程的概念 多线程的由来 并行与并发 多线程的实现 串行与并行 线程的几种状态 串行队列与并发队列区别...

  • 第一章

    Java并发编程与高并发解决方案知识点:线程安全;线程封闭;线程调度;同步容器;并发容器;AQS;J.UC 高并发...

  • 并发整理(一)— Java并发底层原理

    现已全部整理完,其他两篇并发整理(二)— Java线程与锁并发整理(三)— 并发集合类与线程池 本篇主要是底层的东...

  • 多线程(二)-线程同步

    一、概念 并行与并发:1个核对1个线程是并行执行,1个核对多个线程是并发执行。 线程安全:并发带来竞争,竞争的结果...

网友评论

      本文标题:RxJava错误处理与线程并发

      本文链接:https://www.haomeiwen.com/subject/xhawwxtx.html