原文地址:https://five.agency/debugging-rxjava/
首先,从一个谜语开始讲起。假设你遇到如下需求:您正在开发一个新闻应用,它包括一个热门新闻列表和一个刷新按钮,用户可以使用该按钮从网络中请求最新的新闻。
这里有一小断代码,看起来很简单,应该可以实现,但它不能实现。看一下,猜猜当用户按下刷新按钮时会发生什么。
class MainActivity : AppCompatActivity() {
private val refreshProcessor = PublishProcessor.create()
override fun onCreate(savedInstanceState: Bundle?) {
refreshButton.setOnClickListener {
refreshProcessor.onNext(Unit)
}
refreshProcessor
.flatMapSingle(networkRequest::fetchData)
.subscribeOn(backgroundScheduler)
.observeOn(mainScheduler)
.subscribe(view::render)
}
}
如果您猜会抛出NetworkOnMainthreadException,那么恭喜您答对了。
尽管我们添加了.subscribeon(backgroundthread),但这还不够。subscribeon操作符只将订阅过程(稍后我们将讨论)切换到所需的线程,但这并不意味着该条目将在该线程上发出。
您的Rx调用链将在调用onNext的线程上执行,而且在我们的例子中,这是主线程,因此onClickListeners是在主线程上得到通知。
最糟糕的是,在大多数情况下,您的应用程序不会像我们的Demo中那样崩溃,但是主逻辑将在主线程上执行,这可能会导致UI上的丢帧和糟糕的用户体验。
如何解决这个问题?
这个问题没有通用的解决办法。开发人员需要评估这些问题中的每一个,并找到针对特定问题的最佳解决方案。我们需要意识到这一点,但又不得不调试应用程序中的每一种可能的Rx调用链来找出有问题的那个。
我们希望我们的应用程序在发生类似的事情时“抱怨”,当我说“抱怨”时,我的意思是它能打断DEBUG构建并将日志记录到RELEASE构建中。
在简单思考如何实现后,可以得出结论:如果我们想切换到主线程,这意味着我们已经不想再进入主线程了。
这很容易用.compose()操作符实现,在切换到主线程之前,我们先用它检查当前线程是否是主线程。即使这样做可行,我们也不希望有任何额外的操作符开销。
此外,我们还需要手动切换代码库中当前所有的.observeOn(主调度程序)操作符,并记住将来在哪使用它。这是一个冗长的过程,我们不想未来一直考虑它。我们需要一个对当前项目改动最小的解决方案。
如何更好地解决?
因为我们使用依赖注入来注入调度程序,所以我们考虑这样创建 mainScheduler:在切换到主线程之前,调度线程将检查当前线程不是主线程。
需要做的太多了,所以我们把现有的包了起来 :)
class OnRescheduleNotifyMainScheduler : Scheduler() {
private val mainScheduler = AndroidSchedulers.mainThread()
override fun createWorker() = object : Worker() {
private val worker = mainScheduler.createWorker()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (Looper.myLooper() == Looper.getMainLooper()) {
logOrError()
}
return worker.schedule(run, delay, unit)
}
override fun dispose() = worker.dispose()
override fun isDisposed() = worker.isDisposed
}
}
我们为OnRescheduleNotifyMainscheduler提供了依赖项注入,而不是原来的Mainscheduler,在大多数情况下,它都按预期的方式工作。出于某种奇怪的原因,它报告说:我们在主线程上,即使我们不是在主线程上。
这是没有意义的,它看起来像是RxJava的一个实现细节在困扰着我们。
在RxJava内部,订阅过程到底做了什么?
我们开始调试RXJava,通过一个简单的Flowable流,试着理解RxJava内部的工作方式。下面是一个例子:
publishProcessor
.filter(this::isValid)
.map(this::toViewModel)
.doOnNext(this::log)
.observeOn(mainScheduler)
.subscribe(view::render)
以下就是我们学到的:当调用.subscribe()时,订阅过程的启动,将包括三个步骤:
- 订阅上游流
- 通知下游onsubscribed()
- 向上游索取item
看起来就像这样:
image.png我们注意到,当.observeOn()操作符请求数据时,它在提供的调度程序(即Demo中的onRescheduleNotifyMainscheduler)上调度一个任务,但从当前线程调度一个任务,该线程也是主线程,
因为Rx链中没有任何.subscribeOn(backgroundscheduler)。
如果将.subscribeon()添加到流中会发生什么?
当我们添加它时,它会正常工作。这样说也有道理,但为什么一些Rx链工作得很好,而其他一些则报出问题,即使它们也有.subscribeon(backgroundscheduler)¥?有什么区别?
We added the .subscribeOn() operator to the middle of our chain and started debugging again.
publishProcessor
.filter(this::isValid)
.map(this::toViewModel)
.subscribeOn(backgroundScheduler)
.doOnNext(this::log)
.observeOn(mainScheduler)
.subscribe(view::render)
有一件事需要注意:.subscribeon()的位置决定了我们的自定义调度程序是否工作。如果.subscribeon()在.observeon()之前,那么调度程序将无法正常工作,如果它在下面,就一切正常。
通过深入挖掘,我们得出了这个结论。subscribeon 运算符只将订阅进程切换到所需的线程,但这并不意味着项目将在该线程上发出。subscribeOn 运算符只将订阅进程切换到所需的线程,但这并不意味着项目将在该线程上发出。我们已经说过订阅过程由三个步骤组成,.subscribeOn()操作符将这三个步骤切换到指定的线程。它的作用就像是链中的最后一环,当有人订阅它时,它会立即在下游调用 onSubscribed()。当下游向它请求数据时,它会订阅上游,并且会在提供的线程上调用subscribe()方法。
image.png我们学到了什么呢?
实际上,将.subscribeOn()尽可能靠近.subscribe()是有意义的。最好就在它上面,因为整个订阅过程是在提供线程上完成的,而不是在主线程上完成的。另外,如果我们在主线程上做了一些我们不想做的额外工作,那么我们的 onRescheduleNotifyMainscheduler 虽正常工作但会 “抱怨”。
网友评论