RxJava2.x-线程调度

作者: 河马过河 | 来源:发表于2018-08-22 17:43 被阅读56次

    一、线程调度

     public Observable<Integer> getRxJavaCreateExampleData() {
            return Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
                    emitter.onNext(1);
    
                    LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
                    emitter.onNext(2);
    //                Thread.sleep(5000);
                    LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
                    emitter.onNext(3);
                    emitter.onComplete();
                    LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
                    emitter.onNext(4);
    
                }
            });
        }
    
      public void rxJavaSchedulersExample() {
            Disposable disposable = model.getRxJavaCreateExampleData()
                    .subscribeOn(Schedulers.io())
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-doOnNext-:" + integer);
                        }
                    })
                    .observeOn(Schedulers.newThread())
                    .filter(new Predicate<Integer>() {
                        @Override
                        public boolean test(Integer integer) throws Exception {
                            LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-filter-:" + integer);
                            return integer > 2;
                        }
                    }).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                        }
                    });
            compositeDisposable.add(disposable);
        }
    

    日志

    08-22 11:31:40.252 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:1
    08-22 11:31:40.253 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:2
    08-22 11:31:40.253 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:1
    08-22 11:31:40.254 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:3
        getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:4
    08-22 11:31:40.254 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:2
        rxJavaSchedulersExample--:main-doOnNext-:3
    08-22 11:31:40.255 26676-26853/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:1
        rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:2
        rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:3
    08-22 11:31:40.255 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-Consumer-:3
    

    总结

    1、Schedules线程:① io②newThread③computation ④single⑤trampoline
    2、AndroidSchedules线程:AndroidSchedulers.mainThread()
    3、subscribeOn 控制的是上游被观察者 ,调用多次,只有第一次起作用;observeOn控制的是下游观察者,可以多次调用

    备注

    Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    Schedulers.newThread() 代表一个常规的新线程
    AndroidSchedulers.mainThread() 代表Android的主线程

    河马过河微信公众号.jpg

    相关文章

      网友评论

        本文标题:RxJava2.x-线程调度

        本文链接:https://www.haomeiwen.com/subject/kqveiftx.html