Rx中的线程切换

作者: Hanks | 来源:发表于2015-12-07 01:02 被阅读2965次

初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程
首先需要搞懂在RxJava.subscribeOn()observeOn() 之间的区别:

  • .subscribeOn() 用来指定Observable应该操作的调度器(Scheduler)
  • .observeOn() 指定 Observable在一个指定的调度器(Scheduler)上给观察者发送通知
  • 默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致

没理解?

英文原文: https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.nn1m7lrb8</br>
翻译: hanks
注: 不是完全翻译,添加了具体例子

例子

1.主线程 / .subscribe() 线程

在 Activity的 onCreate()(主线程) 方法中添加以下代码:

Observable.just(1,2,3)
  .subscribe();

调用情况如下:

图片图片

实验:

Observable.just(1,2,3)
            .doOnNext(new Action1<Integer>() {
                @Override public void call(Integer integer) {
                    Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override public void call(Integer integer) {
                    Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
                }
            });

输出结果:

12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:3, run In :main

2. .subscribeOn()

即使你在主线程中添加下面的代码,但是整段代码将运行在 .subscribeOn()定义的线程上

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .subscribe();
图片图片

实验:

Observable.just(1,2,3)
           .doOnNext(new Action1<Integer>() {
               @Override public void call(Integer integer) {
                   Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
               }
           })
           .subscribeOn(Schedulers.newThread())
           .subscribe(new Action1<Integer>() {
               @Override public void call(Integer integer) {
                   Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
               }
           });

输出结果:

12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:3, run In :RxNewThreadScheduler-1

3. .observeOn()

加入在主线程中添加下面的代码,首先 Observable 将在 .subscribe() 的线程上创建,但是 .observeOn()方法被调用之后,代码将运行在指定的线程上:

Observable.just(1,2,3)
  .observeOn(Schedulers.newThread())
  .subscribe();
图片图片

实验:

new Thread() {
           @Override public void run() {
               Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               });

           }
       }.start();

输出结果:

12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :Thread-155
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:3, run In :main

3. Combined logic

由于操作可以被组合使用,于是有了下面的代码:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .observeOn(Schedulers.newThread())
  .subscribe();
图片图片

实验:

new Thread() {
     @Override public void run() {
         Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
             @Override public void call(Integer integer) {
                 Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                         .getName());
             }
         })
         .subscribeOn(Schedulers.newThread())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(new Action1<Integer>() {
             @Override public void call(Integer integer) {
                 Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                         .getName());
             }
         });
     }
 }.start();

输出结果:

12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:3, run In :main

Tips / Gotchas:

1. “UI线程运行异常”

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .subscribe(/** logic which touches ui **//); //在newThread中调用

obviously.

2. 逻辑处理放在后台(newThread)

错误姿势:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .observeOn(AndroidSchedulers.mainThread())
  .flatMap(/** logic which doesn't touch ui **//)
  .subscribe();

实验:

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.newThread())
           .observeOn(AndroidSchedulers.mainThread())
           .flatMap(new Func1<String, Observable<String>>() {
               @Override public Observable<String> call(String str) {
                   Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                   return Observable.from(str.split("-") ); // 返回平方
               }
           })
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

输出结果:

12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

正确姿势:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .flatMap(/** logic which doesn't touch ui **//)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe();

第二段代码中 flatMap (或者其他逻辑处理)将运行在后台线程, 如果是在Android中,这样做不会阻塞UI,阻塞UI的话有可能导致ANR之类的异常。这跟 AsyncTask中的 doInBackground()类似,在 doInBackground()中做耗时操作

实验:

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.newThread())
           .flatMap(new Func1<String, Observable<String>>() {
               @Override public Observable<String> call(String str) {
                   Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                   return Observable.from(str.split("-") ); // 返回平方
               }
           })
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

输出结果:

12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

3. 最早的 .subscribeOn() 生效

看下面的代码:

Observable.just(1,2,3)
  .subscribeOn(thread1)
  .subscribeOn(thread2)
  .subscribe();

Observable 的创建和 .subscribeOn() 的调用都将在 thread1 上面执行,所以没有必要多次调用 .subscribeOn(),因为只有第一次的是有用的。

实验:

new Thread() {
      @Override public void run() {
          Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
              @Override public void call(String str) {
                  Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                          .getName());
              }
          })
          .subscribeOn(Schedulers.newThread())
          .subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .subscribe(new Action1<String>() {
              @Override public void call(String str) {
                  Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                          .getName());
              }
          });
      }
  }.start();

输出结果

12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxNewThreadScheduler-1

实验

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.io())
           .subscribeOn(Schedulers.newThread())
           .subscribeOn(Schedulers.computation())
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

输出结果

12-06 16:52:13.378 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxCachedThreadScheduler-2

Android Rxjava Rxandroid

文章出处 (http://hanks.xyz)

相关文章

  • Rx中的线程切换

    初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程...

  • Rx线程切换

    线程切换 Rx数据发射器和观察者在同一个线程,未发生线程切换,串行工作,Rx是一个异步框架,主要功能是提供异步操作...

  • Android-Retrofit2+Rxjava2之网络通用请求

    一直也是用MVP模式,也就结合Rx做网络请求,Rx子线程和UI线程的切换是相当的方便(小白后面准备看下相关的切换的...

  • RxJava——线程控制切换/调度

    本篇代码见:RxJava_Demo_Translater 这里开始学习RxJava线程控制(切换/调度)。一、Rx...

  • 实现简单的 RxKotlin (中)

    线程切换的操作在 Rx 里面非常常用,主要有 subscribeOn observeOn 他们都需要一个 Sche...

  • RxJava的常见使用场景

    demo地址 rx的优势: 线程切换,不需要像handler那样 请求与结果在不同地方 链式编程, 复杂的逻辑形成...

  • 12.RxSwift 调度者(上)

    创建子线程 - self.actionBtn.rx.tap .subscribe 猜想,推测:在子线程中进行 线程...

  • 迷无踪,山涧偶遇小和尚

    RxJava 日常,今天填这个坑。 线程控制 对上下游的线程进行控制 平时写的代码都运行在主线程中,当我们使用Rx...

  • RxJava中的指定线程和执行流程

    Scheduler 的 API (一) 在RxJava 中,Scheduler ——调度器,相当于线程控制器,Rx...

  • 进程切换与线程切换的区别

    注意这个题目问的是进程切换与线程切换的区别,不是进程与线程的区别。当然这里的线程指的是同一个进程中的线程。 这个问...

网友评论

  • mister_eric:截这么多图到底想说明什么
  • buhanzhe:写的是真的好
  • 止念观息:94年的小兄弟好厉害
  • Sunning:我有一个问题不解,麻烦指点一二。
    在Android中可以声明observerOn在Android主线程。
    但是我现在跑的是java项目要用哪个线程去接收呢?
    我现在用的interval和timer,subscribe都接收不到。。
    Hanks:@Sunning 原因是 还没开始发送数据你就给 unsubscribe 了


    try {
    System.out.println(Thread.currentThread());
    Subscriber subscriber = new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable throwable) {
    System.out.println("throwable");
    }

    @Override
    public void onNext(Long o) {
    System.out.println(Thread.currentThread() + "======3");
    System.out.println("Interval" + o);

    }
    };

    Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(subscriber);

    Thread.sleep(1000 * 10);
    subscriber.unsubscribe();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    Sunning:@Hanks 试了一下,还是不行。
    我在main方法里sleep了10秒

    public static void main(String[] args) {
    try {
    Interval integer = new Interval();
    System.out.println(Thread.currentThread());
    Subscriber subscriber = new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable throwable) {
    System.out.println("throwable");
    }

    @Override
    public void onNext(Long o) {
    System.out.println(Thread.currentThread() +"======3");
    System.out.println("Interval" + o);

    }
    };
    integer.create().subscribe(subscriber);
    subscriber.unsubscribe();
    Thread.sleep(1000 * 10);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    private Observable<Long> create() {
    System.out.println(Thread.currentThread() +"======2");
    return Observable.interval(3, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread());
    }
    Hanks:@Sunning interval 和 timer 都是延时发送一个消息,java 中 main 函数以及执行完毕了,所以 subscribe 是收不到消息了,你可以在main 函数让主线程多 sleep 一段时间
  • dev4mobiles:awesome

本文标题:Rx中的线程切换

本文链接:https://www.haomeiwen.com/subject/bghehttx.html