例子
Flowable.create<Int>({ emitter ->
println("观察者可接受事件数量 = ${emitter.requested()}")
var flag = false
for (i in 0..499) {
flag = false
// 若 requested() == 0 则不发送
while (emitter.requested() == 0L) {
if (!flag) {
println("不再发送")
flag = true
}
}
println("发送了事件 $i,观察者可接受事件数量 = ${emitter.requested()}")
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(object : Subscriber<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(s: Subscription) {
println("onSubscribe")
subscribption = s
}
override fun onNext(t: Int) {
println("接收到了事件 $t")
}
override fun onError(t: Throwable) {
println("onError $t")
}
})
Thread.sleep(200)
println("第一次按钮 48")
subscribption?.request(48)
Thread.sleep(200)
println("第二次按钮 48")
subscribption?.request(48)
Thread.sleep(200)
println("第三次按钮 48")
subscribption?.request(48)
上面例子中做的事情很简单:
数据源一共有 500 个数据要发射,当 emitter.requested() == 0
时就不再发射,背压策略是 BackpressureStrategy.ERROR
,发射线程是 Schedulers.io()
,接受线程切换到 Schedulers.computation()
。外部通过 subscription.request(48)
一共 3 次请求 48 个数据,每次请求都有间隔 200ms,以保证每次请求后,上游数据都能发射完成。
运行结果:
观察者可接受事件数量 = 128
发送了事件 0,观察者可接受事件数量 = 128
发送了事件 1,观察者可接受事件数量 = 127
发送了事件 2,观察者可接受事件数量 = 126
...
发送了事件 125,观察者可接受事件数量 = 3
发送了事件 126,观察者可接受事件数量 = 2
发送了事件 127,观察者可接受事件数量 = 1
不再发送
// 200ms 后
第一次按钮 48
接收到了事件 0
接收到了事件 1
...
接收到了事件 45
接收到了事件 46
接收到了事件 47
// 200ms 后
第二次按钮 48
接收到了事件 48
接收到了事件 49
接收到了事件 50
...
接收到了事件 93
接收到了事件 94
接收到了事件 95
发送了事件 128,观察者可接受事件数量 = 96
发送了事件 129,观察者可接受事件数量 = 95
发送了事件 130,观察者可接受事件数量 = 94
...
发送了事件 221,观察者可接受事件数量 = 3
发送了事件 222,观察者可接受事件数量 = 2
发送了事件 223,观察者可接受事件数量 = 1
// 200ms 后
不再发送
第三次按钮 48
接收到了事件 96
接收到了事件 97
接收到了事件 98
接收到了事件 99
...
接收到了事件 141
接收到了事件 142
接收到了事件 143
END
从上面结果我们可以看到现象:
- 初始时,数据源只知道下游观察者接收数据的能力为 128 个数据,于是一次性发送 128 个数据,直到下游观察者不再能够接收数据。
- 观察者第一次请求 48 个数据,并依次收到了上游发送的 48 个数据。这个时候按道理缓冲区至少空出了 48 个位置,但是上游并没有再发送 48 个数据去补充。
- 观察者第二次请求 48 个数据,并依次收到了上游发送的 48 个数据。理论上这个时候缓冲区中此时已发射 48+48=96 个数据(96 也是缓冲区大小 128 的 2/3),达到缓冲区大小的 2/3。因此上游马上又发送了 96 个数据以填补缓冲区。再经过几次试验,比如第二次请求超过 48 个数据,当缓冲区中空出位置大于缓冲区大小的 2/3 时,上游会马上发射 2/3 数据来填补缓冲区。这个原理我们后面再说。
- 观察者第三次请求 48 个数据,并依次收到了上游发送的 48 个数据。
下面我们来看一下如何通过源码解释上面的现象。了解 RxJava 普通操作符源码的同学都知道,Flowable.create()
实际上创建了一个 FlowableCreate
实例。observeOn(Schedulers.computation())
实际上创建了一个 FlowableObserveOn
实例,并在其中发射消息时切换了线程。
FlowableCreate
有了之前源码分析的经验,我们只用关注 subscribeActual()
方法:
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
根据创建 Flowable
时所传入的不同策略,创建了不同的 Emitter
。
MissingEmitter
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
// 将数据发给下游
downstream.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 检查自己的计数器
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
如果有数据往下发(开发者主动调用了 onNext
来发射数据),则直接发给下游。并且给自己的计数器 -1,如果计数器本身就为 0 了则不再 -1.Emitter
自己就是一个计数器,它是如何被赋值的呢,我们看看基类 BaseEmitter
:
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
final Subscriber<? super T> downstream;
BaseEmitter(Subscriber<? super T> downstream) {
this.downstream = downstream;
}
//... 省略了不是背压处理相关的函数:error, complete 等,跟普通的 Observable 差不多
// 设置计数器的值
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
void onRequested() {
// default is no-op
}
// 获取计数器的值
@Override
public final long requested() {
return get();
}
}
可以看到 Emitter
自己的计数器的值是靠 request(n)
方法来设置的。
我们之后可以看到,这个
request(n)
方法是其下游xxxSubscriber.onSubscribe()
方法里来更新上游的。
ErrorAsyncEmitter
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
ErrorAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 计数器 != 0 则往下发射数据,并且计数器减 1
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
// 如果计数器 == 0,上游还要往下发,则调用 onOverflow() 来处理溢出
onOverflow();
}
}
abstract void onOverflow();
}
ErrorAsyncEmitter
类继承自 NoOverflowBaseAsyncEmitter
。不同于上面的 MissingEmitter
是收到数据后直接往下发,然后计数器 -1,ErrorAsyncEmitter
是计数器不为 0 才会往下发射数据,如果计数器为 0 了,则做溢出处理:往下游通知 onError
。
DropAsyncEmitter
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
DropAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// nothing to do
}
}
跟 ErrorAsyncEmitter
差不多,只不过溢出时 onOverflow()
不做任何处理。
LatestAsyncEmitter
这个的实现复杂一点,代码就不贴了。这个策略主要是当缓冲区满了的时候,如果上游再往下发射数据,LatestAsyncEmitter
会缓存最近一次发射的数据。当下游再 request
数据时,会将缓存的数据也一起发射下去。
缓冲区
上面的所有 Emitter
我们都只看到了计数器(Emitter
自身就是一个计数器),计数器帮助 Emitter
执行背压策略来发射数据。那么发射的数据是在哪里被缓存的呢?思考一下,数据缓存只有在异步的时候才需要,而在什么时候我们能够明确地知道是异步操作呢?那就是在调用切线程的操作符时。因此我们看看 FlowableObserveOn
:
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int prefetch;
public FlowableObserveOn(
Flowable<T> source,
Scheduler scheduler,
boolean delayError,
int prefetch) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
// 缓冲区大小
this.prefetch = prefetch;
}
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
// 走这里,构造 ObserveOnSubscriber 对象
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
}
构造缓冲区
上面主要需要关注的就是 prefetch
值。该值是由开发者调用 observeOn
时传入的,默认是 128。然后将这个缓冲区大小传入 ObserveOnSubscriber
的构造函数。我们先看下 ObserveOnSubscriber.onSubscribe()
方法:
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
// ... 省去一些代码
queue = new SpscArrayQueue<T>(prefetch);
downstream.onSubscribe(this);
s.request(prefetch);
}
}
可以看到构造了一个 prefetch
大小的 SpscArrayQueue
,并且依次向下游调用 onSubscribe()
,向上游 request(prefetch)
。还记得上一节中,各个 Emitter
的计数器是怎么来的吗?就是下游主动 request
的。因此我们开头的例子中 ErrorAsyncEmitter
的计数器大小是由这个 ObserveOnSubscriber
主动 request(128)
的,,也就是 128 大小,这也符合我们的 log 中的现象。
设置 limit 为 2/3 缓冲区大小
我们再看构造 FlowableObserveOn
时调用的父类 BaseObserveOnSubscriber
:
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
this.worker = worker;
this.delayError = delayError;
this.prefetch = prefetch;
this.requested = new AtomicLong();
// 这里 limit 设置为缓冲区大小的 2/3
this.limit = prefetch - (prefetch >> 2);
}
可以看到上面设置了一个 limit
为缓冲区 prefetch
的 2/3 大小。参照我们例子中的 log:
- 第一次观察者消费 48 个数据,此时没有触发上游发射新的数据来填补缓冲区。
- 第二次观察者消费 48 个数据,此时观察者消费到序号 95(也就是第 96 个数据)的数据后,上游马上开始发射数据。而 96 就是 128 的 2/3,因此可以知道,这个
limit
是用于判断缓冲区被消费 2/3(或者说只剩余少于 1/3)时,下游主动向上游请求数据的。因此这个limit
应该是用于onNext()
方法中的。
缓冲区的消费行为
直接看 onNext()
方法:
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (!queue.offer(t)) {
upstream.cancel();
// 这就是采用 MISSING 策略时,缓冲区满的时候的报错
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
在这里我们发现了 MISSING
策略的报错信息。根据我们上面的分析,MISSING
策略就是无脑往下游发射数据,到 BaseObserveOnSubscriber.onNext()
这里时,queue.offer(t)
返回 false
,表示缓冲区已满,于是向下游发送 error
。因此我们知道了,MISSING
的意思其实是 missing 背压策略,而不是 missing 发射的数据。
接着调用 trySchedule()
,我们来看看:
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
// 这里切线程执行任务
worker.schedule(this);
}
上面通过 worker
来切换线程,最终执行的是 ObserveOnSubscriber.runAsync()
:
@Override
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
// 下游观察者通过调用 request 设置的数目。在我们的例子中,就是 subscription 每次 request 的 48 个数据
long r = requested.get();
// 死循环,直到发射下游请求的数目个数据
while (e != r) {
boolean d = done;
T v;
try {
// 从缓存中取出数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
upstream.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
e++;
// 如果已发射数据 == limit(缓冲区的 2/3),则向上游请求 limit 个数据,并且一次性更新 request 数据
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
upstream.request(e);
e = 0L;
}
}
if (e == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
上面的关键逻辑其实很简单,而且也能解释我们例子中 log 的现象:从缓冲区中一直取数据往下发射,直到发射了下游 request
个数据。如果还没满足下游请求,发射数据就已经达到了缓冲区 2/3 个,那么就主动向上游请求 2/3 个数据,以填满缓冲区。
这就解释了为什么例子中下游观察者只有消费了 96 个以上数据,上游才会继续往下发射 96 个数据(注意是发射缓冲区 2/3 个,如果缓冲区空余大于 2/3,那么上游也只会发射 2/3 个)
多个背压策略
如果我们创建多个背压 Flowable
后,再使用 onBackpressureLatest()
, onBackpressureDrop
等背压操作符,那么最后的背压策略是以哪个为准呢?通过看源码我们可以知道,每一个背压操作符在 onSubscribe()
方法中,都是固定向上 request(Long.MAX_VALUE)
,这样就会使上游的背压策略中的计数器变为无限大,而使上游的背压策略失效。因此,背压策略的操作符总是以最后一个为准。
中间的普通操作符有背压吗?
我们看 FlowableMap
,它构造的是 MapConditionalSubscriber
,它没有像背压操作符一样继承自 AtomicLong
,从而变为一个计数器(计数器被用于判断是否执行背压策略),只是简单地将数据做变换后发给下游。因此背压策略的操作在这些普通操作符上是不生效的。
网友评论