RxJava2.x-线程调度

作者: 河马过河 | 来源:发表于2018-08-22 17:43 被阅读56次

一、线程调度

 public Observable<Integer> getRxJavaCreateExampleData() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);

                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);

            }
        });
    }

  public void rxJavaSchedulersExample() {
        Disposable disposable = model.getRxJavaCreateExampleData()
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-doOnNext-:" + integer);
                    }
                })
                .observeOn(Schedulers.newThread())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-filter-:" + integer);
                        return integer > 2;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                    }
                });
        compositeDisposable.add(disposable);
    }

日志

08-22 11:31:40.252 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:1
08-22 11:31:40.253 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:2
08-22 11:31:40.253 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:1
08-22 11:31:40.254 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:3
    getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:4
08-22 11:31:40.254 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:2
    rxJavaSchedulersExample--:main-doOnNext-:3
08-22 11:31:40.255 26676-26853/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:1
    rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:2
    rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:3
08-22 11:31:40.255 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-Consumer-:3

总结

1、Schedules线程:① io②newThread③computation ④single⑤trampoline
2、AndroidSchedules线程:AndroidSchedulers.mainThread()
3、subscribeOn 控制的是上游被观察者 ,调用多次,只有第一次起作用;observeOn控制的是下游观察者,可以多次调用

备注

Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

河马过河微信公众号.jpg

相关文章

  • RxJava2.x-线程调度

    一、线程调度 日志 总结 1、Schedules线程:① io②newThread③computation ④s...

  • java虚拟机读书笔记之线程调度

    java线程调度 线程调度主要有两种方式,协同式线程调度和抢占式线程调度。1、协同式: 线程的执行时间由线程本身...

  • [Java]线程和锁

    0x00 线程调度 线程调度指的是系统为线程分配CPU使用权。分为两种: 协同式线程调度线程想用CPU多久就用多久...

  • CPU调度

    CPU调度 基本概念 CPU调度在讨论普通调度概念时使用进程调度,特别指定为线程概念时使用线程调度 CPU-I/O...

  • 2018-04-03 线程基础

    线程调度 是指系统分配CPU使用权限的方式,分为协同式线程调度和抢占式线程调度 进程、线程概念 进程是应用程序的一...

  • 线程优先级和守护线程

    线程优先级: Java提供一个线程调度器来监控程序中启动后进入就绪状态的所有线程,线程调度器按照优先级决定调度哪个...

  • 并发--线程和锁

    线程调度 协同式调度 1.一个线程执行完毕之后再通知其他线程执行 抢占式调度(JAVA使用的是这种方式) 1.os...

  • 2.1 Java线程调度

    线程调度是指系统为线程分配处理器使用权的过程,主要调度方式有两种,分别是协同式线程调度(CooperativeTh...

  • 网络编程(三)

    Volley用法完全解析 从上图可以看到Volley分为三个线程,分别是主线程、缓存调度线程、和网络调度线程,首先...

  • EffectiveJava第十章第七节

    不要依赖于线程调度器 当有多个线程可以运行时,由线程调度器(thread scheduler)决定哪些线程将会运行...

网友评论

    本文标题:RxJava2.x-线程调度

    本文链接:https://www.haomeiwen.com/subject/kqveiftx.html