美文网首页
11. Rxjava2 : 线程切换

11. Rxjava2 : 线程切换

作者: 0青衣小褂0 | 来源:发表于2019-02-12 15:32 被阅读58次

    1. RxJava2 : 什么是观察者模式
    2. RxJava2 : 创建操作符(无关时间)
    3. Rxjava2 : 创建操作符(有关时间)
    4. Rxjava2 : 变换操作符
    5. Rxjava2 : 判断操作符
    6. Rxjava2 : 筛选操作符
    7. Rxjava2 : 合并操作符
    8. Rxjava2 : do操作符
    9. Rxjava2 : error处理
    10. Rxjava2 : 重试
    11. Rxjava2 : 线程切换

    未指定线程

    • 如果未进行线程指定,则 Observable 和 Observer 的线程均为当前线程
    new Thread(new Runnable() {
                @Override
                public void run() {
                    Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                            Thread thread = Thread.currentThread();
                            String name = thread.getName();
                            Log.d(TAG,"Observable的线程:" + name);
                            e.onNext(1);
                        }
                    })
                            .subscribe(new Observer<Integer>() {
                                @Override
                                public void onSubscribe(Disposable d) {
    
                                }
    
                                @Override
                                public void onNext(Integer integer) {
                                    Thread thread = Thread.currentThread();
                                    String name = thread.getName();
                                    Log.d(TAG,"subscribe的线程:" + name);
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onComplete() {
    
                                }
                            });
                }
            }).start();
    

    log

    02-12 10:34:06.553 17495-17545/... D/SplashActivity: Observable的线程:Thread-339
    02-12 10:34:06.553 17495-17545/... D/SplashActivity: subscribe的线程:Thread-339
    

    通过 subscribeOn 和 observeOn 指定线程

    subscribeOn:

    1. 只有第一次指定的时候起作用
    2. 用于指定上游的线程(即Observable的线程)
    3. 作用的区间为: [Observable,直至有observeOn切换线程]

    observeOn:

    1. 可以多次指定来切换线程
    2. 作用的区间为: [当前 observeOn 之后,下一次 observeOn 指定之前]

    demo0

    • 只指定了一次 subscribeOn,并且没有通过 observeOn 重新指定线程,因此,均运行在 subscribeOn 所指定的线程当中
     Observable.create((ObservableOnSubscribe<Integer>) e -> {
                Thread thread = Thread.currentThread();
                String name = thread.getName();
                Log.d(TAG, "Observable的线程:" + name);
                e.onNext(1);
            })
                    .subscribeOn(Schedulers.io())
                    .map(integer -> {
                        Thread thread = Thread.currentThread();
                        String name = thread.getName();
                        Log.d(TAG, "期间的所在的线程" + name);
                        return integer + 1;
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Thread thread = Thread.currentThread();
                            String name = thread.getName();
                            Log.d(TAG, "subscribe的线程:" + name);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    log

    02-12 10:45:46.033 19142-19191/... D/SplashActivity: Observable的线程:RxCachedThreadScheduler-2
    02-12 10:45:46.033 19142-19191/... D/SplashActivity: 期间的所在的线程RxCachedThreadScheduler-2
    02-12 10:45:46.033 19142-19191/... D/SplashActivity: subscribe的线程:RxCachedThreadScheduler-2
    

    demo1

    • subscribeOn多次指定并没有效果
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                Thread thread = Thread.currentThread();
                String name = thread.getName();
                Log.d(TAG, "Observable的线程:" + name);
                e.onNext(1);
            })
                    .subscribeOn(Schedulers.io())
                    .map(integer -> {
                        Thread thread = Thread.currentThread();
                        String name = thread.getName();
                        Log.d(TAG, "期间的所在的线程" + name);
                        return integer + 1;
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Thread thread = Thread.currentThread();
                            String name = thread.getName();
                            Log.d(TAG, "subscribe的线程:" + name);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    log

    02-12 10:55:24.323 19936-19988/... D/SplashActivity: Observable的线程:RxCachedThreadScheduler-2
    02-12 10:55:24.333 19936-19988/... D/SplashActivity: 期间的所在的线程RxCachedThreadScheduler-2
    02-12 10:55:24.333 19936-19988/... D/SplashActivity: subscribe的线程:RxCachedThreadScheduler-2
    

    demo2

    • subscribeOn 指定了 io 线程,observeOn 指定了主线程,observeOn 的作用区间在使用他之后
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                Thread thread = Thread.currentThread();
                String name = thread.getName();
                Log.d(TAG, "Observable的线程:" + name);
                e.onNext(1);
            })
                    .subscribeOn(Schedulers.io())
                    .map(integer -> {
                        Thread thread = Thread.currentThread();
                        String name = thread.getName();
                        Log.d(TAG, "期间的所在的线程" + name);
                        return integer + 1;
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Thread thread = Thread.currentThread();
                            String name = thread.getName();
                            Log.d(TAG, "subscribe的线程:" + name);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    

    log

    02-12 10:52:05.973 19615-19665/... D/SplashActivity: Observable的线程:RxCachedThreadScheduler-2
    02-12 10:52:05.983 19615-19665/... D/SplashActivity: 期间的所在的线程RxCachedThreadScheduler-2
    02-12 10:52:06.003 19615-19615/... D/SplashActivity: subscribe的线程:main
    

    demo3

    • 多次采用 observeOn 来切换线程
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
                Thread thread = Thread.currentThread();
                String name = thread.getName();
                Log.d(TAG, "Observable的线程:" + name);
                e.onNext(1);
            })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(integer -> {
                        Thread thread = Thread.currentThread();
                        String name = thread.getName();
                        Log.d(TAG, "期间的所在的线程" + name);
                        return integer + 1;
                    })
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Thread thread = Thread.currentThread();
                            String name = thread.getName();
                            Log.d(TAG, "subscribe的线程:" + name);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    log

    02-12 10:58:20.293 20257-20306/... D/SplashActivity: Observable的线程:RxCachedThreadScheduler-2
    02-12 10:58:20.313 20257-20257/... D/SplashActivity: 期间的所在的线程main
    02-12 10:58:20.313 20257-20308/... D/SplashActivity: subscribe的线程:RxNewThreadScheduler-1
    

    相关文章

      网友评论

          本文标题:11. Rxjava2 : 线程切换

          本文链接:https://www.haomeiwen.com/subject/gzrkeqtx.html