美文网首页androidRxAndroid RxJava
RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通

RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通

作者: 泽毛 | 来源:发表于2017-08-27 16:37 被阅读7638次

RxJava2 实战系列文章

RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) - 优化搜索联想功能
RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用


一、前言

接触RxJava2已经很久了,也看了网上的很多文章,发现基本都是在对RxJava的基本思想介绍之后,再去对各个操作符进行分析,但是看了之后感觉过了不久就忘了。

偶然的机会看到了开源项目 RxJava-Android-Samples,这里一共介绍了十六种RxJava2的使用场景,它从实际的应用场景出发介绍RxJava2的使用,特别适合对于RxJava2已经有初步了解的开发者进一步地去学习如何将其应用到实际开发当中。

因此,我打算跟着这个项目的思路编写一系列实战的介绍并完成示例代码编写,并对该实例中用到的知识进行介绍,做到学以致用。下面,就开始第一个例子的学习,源码的仓库为:RxSample

二、示例

2.1 应用场景

当我们需要进行一些耗时操作,例如下载、访问数据库等,为了不阻塞主线程,往往会将其放在后台进行处理,同时在处理的过程中、处理完成后通知主线程更新UI,这里就涉及到了后台线程和主线程之间的切换。首先回忆一下,在以前我们一般会用以下两种方式来实现这一效果:

  • 创建一个新的子线程,在其run()方法中执行耗时的操作,并通过一个和主线程Looper关联的Handler发送消息给主线程更新进度显示、处理结果。
  • 使用AsyncTask,在其doInBackground方法中执行耗时的操作,调用publishProgress方法通知主线程,然后在onProgressUpdate中更新进度显示,在onPostExecute中显示最终结果。

那么,让我们看一些在RxJava中如何完成这一需求。

2.2 示例代码

我们的界面上有一个按钮mTvDownload,点击之后会发起一个耗时的任务,这里我们用Thread.sleep来模拟耗时的操作,每隔500ms我们会将当前的进度通知主线程,在mTvDownloadResult中显示当前处理的进度。

public class BackgroundActivity extends AppCompatActivity {

    private TextView mTvDownload;
    private TextView mTvDownloadResult;
    private CompositeDisposable mCompositeDisposable = new CompositeDisposable();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_background);
        mTvDownload = (TextView) findViewById(R.id.tv_download);
        mTvDownloadResult = (TextView) findViewById(R.id.tv_download_result);
        mTvDownload.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                startDownload();
            }
        });
    }

    private void startDownload() {
        final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for (int i = 0; i < 100; i++) {
                    if (i % 20 == 0) {
                        try {
                            Thread.sleep(500); //模拟下载的操作。
                        } catch (InterruptedException exception) {
                            if (!e.isDisposed()) {
                                e.onError(exception);
                            }
                        }
                        e.onNext(i);
                    }
                }
                e.onComplete();
            }

        });
        DisposableObserver<Integer> disposableObserver = new DisposableObserver<Integer>() {

            @Override
            public void onNext(Integer value) {
                Log.d("BackgroundActivity", "onNext=" + value);
                mTvDownloadResult.setText("Current Progress=" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d("BackgroundActivity", "onError=" + e);
                mTvDownloadResult.setText("Download Error");
            }

            @Override
            public void onComplete() {
                Log.d("BackgroundActivity", "onComplete");
                mTvDownloadResult.setText("Download onComplete");
            }
        };
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.clear();
    }
}

实际的运行结果如下:


三、示例解析

3.1 线程切换

在上面的例子中,涉及到了两种类型的操作:

  • 需要在后台执行的耗时操作,对应于subscribe(ObservableEmitter<Integer> e)中的代码。
  • 需要在主线程进行UI更新的操作,对应于DisposableObserver的所有回调,具体的是在onNext中进行进度的更新;在onCompleteonError中展示最终的处理结果。

那么,这两种类型操作所运行的线程是在哪里指定的呢,关键是下面这句:

observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
  • subscribeOn(Schedulers.io()):指定observablesubscribe方法运行在后台线程。
  • observeOn(AndroidSchedulers.mainThread()):指定observer的回调方法运行在主线程。

这两个函数刚开始的时候很有可能弄混,我是这么记的,subscribeOns开头,可以理解为“上游”开头的谐音,也就是上游执行的线程。

关于这两个函数,还有一点说明:多次调用subscribeOn,会以第一次的为准;而多次调用observeOn则会以最后一次的为准,不过一般我们都不会这么干,就不举例子了。

3.2 线程的类型

subscribeOn/observeOn都要求传入一个Schedulers的子类,它就代表了运行线程类型,下面我们来看一下都有哪些选择:

  • Schedulers.computation():用于计算任务,默认线程数等于处理器的数量。
  • Schedulers.from(Executor executor):使用Executor作为调度器,关于Executor框架可以参考这篇文章:多线程知识梳理(5) - 线程池四部曲之 Executor 框架
  • Schedulers.io( ):用于IO密集型任务,例如访问网络、数据库操作等,也是我们最常使用的。
  • Schedulers.newThread( ):为每一个任务创建一个新的线程。
  • Schedulers.trampoline( ):当其它排队的任务完成后,在当前线程排队开始执行。
  • Schedulers.single():所有任务共用一个后台线程。

以上是在io.reactivex.schedulers包中,提供的Schedulers,而如果我们导入了下面的依赖,那么在io.reactivex.android.schedulers下,还有额外的两个Schedulers可选:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
  • AndroidSchedulers.mainThread():运行在应用程序的主线程。
  • AndroidSchedulers.from(Looper looper):运行在该looper对应的线程当中。

3.3 使用 CompositeDisposable 对下游进行管理

如果Activity要被销毁时,我们的后台任务没有执行完,那么就会导致Activity不能正常回收,而对于每一个Observer,都会有一个Disposable对象用于管理,而RxJava提供了一个CompositeDisposable类用于管理这些Disposable,我们只需要将其将入到该集合当中,在ActivityonDestroy方法中,调用它的clear方法,就能避免内存泄漏的发生。

四、小结

这个系列的第一篇文章,我们介绍了如何使用subscribeOn/observeOn来实现后台执行耗时任务,并通知主线程更新进度。


更多文章,欢迎访问我的 Android 知识梳理系列:

相关文章

网友评论

  • FireStyle:请问下,我想直接在java下进行测试而不是在android环境下,想实现子线程等10秒后发送,然后在主线程接收,但是一直不会打印出来,麻烦帮看一下,谢谢。
    private static void testThread() {
    System.out.println(Thread.currentThread().getName());
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
    Thread.sleep(10000);
    observableEmitter.onNext(1);
    System.out.println("haha," + Thread.currentThread().getName());
    }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.trampoline()).subscribe(new Consumer<Integer>() {
    public void accept(Integer integer) throws Exception {
    System.out.println(integer);
    System.out.println(Thread.currentThread().getName());
    }
    });
    }
  • Silence潇湘夜雨:重复点击会出错,是因为没有解绑吗?
  • Mike张小多:主席写的很好 赞一个。
  • Active_Loser:这样就充分理解了
  • 池塘细雨:非常好。就是楼主的名字是啥意思
    泽毛:@池塘细雨 谢谢老板打赏,就是一个外号而已啦。
  • MigrationUK:rxjava2 废弃了Schedulers.immediate( )线程类型
    泽毛:多谢提醒:smiley:
  • Silence潇湘夜雨:逻辑清晰,学以致用,很有用。
    泽毛:@Silence潇湘夜雨 谢谢支持 🤗
  • 夜幕流星雨:很实用,想问下DisposableObserver 和普通的Observer 有什么区别吗。
    泽毛:@夜幕流星雨 使用上没什么区别,因为它同时实现了 Observer 接口和 Disposable 接口,因此如果想要取消订阅的话可以直接调用它的 dispose 方法,Observer 就没有这个功能。
  • 贾亦真亦贾:谢谢 写的非常好 理论和实践的结合才是真正的学以致用。
    泽毛: @贾亦真亦贾 嗯嗯,谢谢你的支持 😁

本文标题:RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通

本文链接:https://www.haomeiwen.com/subject/utobdxtx.html