Rx中的线程切换

作者: Hanks | 来源:发表于2015-12-07 01:02 被阅读2965次

    初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程
    首先需要搞懂在RxJava.subscribeOn()observeOn() 之间的区别:

    • .subscribeOn() 用来指定Observable应该操作的调度器(Scheduler)
    • .observeOn() 指定 Observable在一个指定的调度器(Scheduler)上给观察者发送通知
    • 默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致

    没理解?

    英文原文: https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.nn1m7lrb8</br>
    翻译: hanks
    注: 不是完全翻译,添加了具体例子

    例子

    1.主线程 / .subscribe() 线程

    在 Activity的 onCreate()(主线程) 方法中添加以下代码:

    Observable.just(1,2,3)
      .subscribe();
    

    调用情况如下:

    图片图片

    实验:

    Observable.just(1,2,3)
                .doOnNext(new Action1<Integer>() {
                    @Override public void call(Integer integer) {
                        Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override public void call(Integer integer) {
                        Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
                    }
                });
    

    输出结果:

    12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :main
    12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:1, run In :main
    12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :main
    12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:2, run In :main
    12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :main
    12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:3, run In :main
    

    2. .subscribeOn()

    即使你在主线程中添加下面的代码,但是整段代码将运行在 .subscribeOn()定义的线程上

    Observable.just(1,2,3)
      .subscribeOn(Schedulers.newThread())
      .subscribe();
    
    图片图片

    实验:

    Observable.just(1,2,3)
               .doOnNext(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
                   }
               })
               .subscribeOn(Schedulers.newThread())
               .subscribe(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
                   }
               });
    

    输出结果:

    12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
    12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:1, run In :RxNewThreadScheduler-1
    12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
    12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:2, run In :RxNewThreadScheduler-1
    12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
    12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:3, run In :RxNewThreadScheduler-1
    

    3. .observeOn()

    加入在主线程中添加下面的代码,首先 Observable 将在 .subscribe() 的线程上创建,但是 .observeOn()方法被调用之后,代码将运行在指定的线程上:

    Observable.just(1,2,3)
      .observeOn(Schedulers.newThread())
      .subscribe();
    
    图片图片

    实验:

    new Thread() {
               @Override public void run() {
                   Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
                       @Override public void call(Integer integer) {
                           Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                                   .getName());
                       }
                   })
                   .observeOn(AndroidSchedulers.mainThread())
                   .subscribe(new Action1<Integer>() {
                       @Override public void call(Integer integer) {
                           Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                                   .getName());
                       }
                   });
    
               }
           }.start();
    

    输出结果:

    12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :Thread-155
    12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :Thread-155
    12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :Thread-155
    12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:1, run In :main
    12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:2, run In :main
    12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:3, run In :main
    

    3. Combined logic

    由于操作可以被组合使用,于是有了下面的代码:

    Observable.just(1,2,3)
      .subscribeOn(Schedulers.newThread())
      .observeOn(Schedulers.newThread())
      .subscribe();
    
    图片图片

    实验:

    new Thread() {
         @Override public void run() {
             Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
                 @Override public void call(Integer integer) {
                     Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                             .getName());
                 }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Action1<Integer>() {
                 @Override public void call(Integer integer) {
                     Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                             .getName());
                 }
             });
         }
     }.start();
    

    输出结果:

    12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
    12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
    12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
    12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:1, run In :main
    12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:2, run In :main
    12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:3, run In :main
    

    Tips / Gotchas:

    1. “UI线程运行异常”

    Observable.just(1,2,3)
      .subscribeOn(Schedulers.newThread())
      .subscribe(/** logic which touches ui **//); //在newThread中调用
    

    obviously.

    2. 逻辑处理放在后台(newThread)

    错误姿势:

    Observable.just(1,2,3)
      .subscribeOn(Schedulers.newThread())
      .observeOn(AndroidSchedulers.mainThread())
      .flatMap(/** logic which doesn't touch ui **//)
      .subscribe();
    

    实验:

    new Thread() {
           @Override public void run() {
               Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
                   @Override public void call(String str) {
                       Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               })
               .subscribeOn(Schedulers.newThread())
               .observeOn(AndroidSchedulers.mainThread())
               .flatMap(new Func1<String, Observable<String>>() {
                   @Override public Observable<String> call(String str) {
                       Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                       return Observable.from(str.split("-") ); // 返回平方
                   }
               })
               .subscribe(new Action1<String>() {
                   @Override public void call(String str) {
                       Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               });
           }
       }.start();
    
    

    输出结果:

    12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
    12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
    12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[main,5,main]
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[main,5,main]
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[main,5,main]
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
    12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main
    

    正确姿势:

    Observable.just(1,2,3)
      .subscribeOn(Schedulers.newThread())
      .flatMap(/** logic which doesn't touch ui **//)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe();
    

    第二段代码中 flatMap (或者其他逻辑处理)将运行在后台线程, 如果是在Android中,这样做不会阻塞UI,阻塞UI的话有可能导致ANR之类的异常。这跟 AsyncTask中的 doInBackground()类似,在 doInBackground()中做耗时操作

    实验:

    new Thread() {
           @Override public void run() {
               Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
                   @Override public void call(String str) {
                       Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               })
               .subscribeOn(Schedulers.newThread())
               .flatMap(new Func1<String, Observable<String>>() {
                   @Override public Observable<String> call(String str) {
                       Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                       return Observable.from(str.split("-") ); // 返回平方
                   }
               })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Action1<String>() {
                   @Override public void call(String str) {
                       Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               });
           }
       }.start();
    

    输出结果:

    12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
    12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[RxNewThreadScheduler-1,5,main]
    12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
    12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[RxNewThreadScheduler-1,5,main]
    12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
    12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[RxNewThreadScheduler-1,5,main]
    12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
    12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
    12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
    12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
    12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
    12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main
    

    3. 最早的 .subscribeOn() 生效

    看下面的代码:

    Observable.just(1,2,3)
      .subscribeOn(thread1)
      .subscribeOn(thread2)
      .subscribe();
    

    Observable 的创建和 .subscribeOn() 的调用都将在 thread1 上面执行,所以没有必要多次调用 .subscribeOn(),因为只有第一次的是有用的。

    实验:

    new Thread() {
          @Override public void run() {
              Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
                  @Override public void call(String str) {
                      Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                              .getName());
                  }
              })
              .subscribeOn(Schedulers.newThread())
              .subscribeOn(Schedulers.io())
              .subscribeOn(Schedulers.computation())
              .subscribe(new Action1<String>() {
                  @Override public void call(String str) {
                      Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                              .getName());
                  }
              });
          }
      }.start();
    

    输出结果

    12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
    12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxNewThreadScheduler-1
    12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
    12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxNewThreadScheduler-1
    12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
    12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxNewThreadScheduler-1
    
    

    实验

    new Thread() {
           @Override public void run() {
               Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
                   @Override public void call(String str) {
                       Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               })
               .subscribeOn(Schedulers.io())
               .subscribeOn(Schedulers.newThread())
               .subscribeOn(Schedulers.computation())
               .subscribe(new Action1<String>() {
                   @Override public void call(String str) {
                       Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               });
           }
       }.start();
    
    

    输出结果

    12-06 16:52:13.378 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxCachedThreadScheduler-2
    12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxCachedThreadScheduler-2
    12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxCachedThreadScheduler-2
    12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxCachedThreadScheduler-2
    12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxCachedThreadScheduler-2
    12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxCachedThreadScheduler-2
    

    Android Rxjava Rxandroid

    文章出处 (http://hanks.xyz)

    相关文章

      网友评论

      • mister_eric:截这么多图到底想说明什么
      • buhanzhe:写的是真的好
      • 止念观息:94年的小兄弟好厉害
      • Sunning:我有一个问题不解,麻烦指点一二。
        在Android中可以声明observerOn在Android主线程。
        但是我现在跑的是java项目要用哪个线程去接收呢?
        我现在用的interval和timer,subscribe都接收不到。。
        Hanks:@Sunning 原因是 还没开始发送数据你就给 unsubscribe 了


        try {
        System.out.println(Thread.currentThread());
        Subscriber subscriber = new Subscriber<Long>() {
        @Override
        public void onCompleted() {
        System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable throwable) {
        System.out.println("throwable");
        }

        @Override
        public void onNext(Long o) {
        System.out.println(Thread.currentThread() + "======3");
        System.out.println("Interval" + o);

        }
        };

        Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(subscriber);

        Thread.sleep(1000 * 10);
        subscriber.unsubscribe();
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        Sunning:@Hanks 试了一下,还是不行。
        我在main方法里sleep了10秒

        public static void main(String[] args) {
        try {
        Interval integer = new Interval();
        System.out.println(Thread.currentThread());
        Subscriber subscriber = new Subscriber<Long>() {
        @Override
        public void onCompleted() {
        System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable throwable) {
        System.out.println("throwable");
        }

        @Override
        public void onNext(Long o) {
        System.out.println(Thread.currentThread() +"======3");
        System.out.println("Interval" + o);

        }
        };
        integer.create().subscribe(subscriber);
        subscriber.unsubscribe();
        Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        }

        private Observable<Long> create() {
        System.out.println(Thread.currentThread() +"======2");
        return Observable.interval(3, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread());
        }
        Hanks:@Sunning interval 和 timer 都是延时发送一个消息,java 中 main 函数以及执行完毕了,所以 subscribe 是收不到消息了,你可以在main 函数让主线程多 sleep 一段时间
      • dev4mobiles:awesome

      本文标题:Rx中的线程切换

      本文链接:https://www.haomeiwen.com/subject/bghehttx.html