例子
1)引入 Maven 依赖
<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava -->
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.13</version>
</dependency>
2)实验代码:
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;
public class TestTimeout {
public static void main(String[] args) {
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
});
source.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("onNext: " + item),
error -> System.out.println("onError: " + error),
() -> System.out.println("onComplete will not be printed!"));
}
}
3)控制台输出:
onNext: A
onNext: B
onNext: C
onError: java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
源码解析
ObservableTimeoutTimed
1)创建 ObservableTimeoutTimed
package io.reactivex.rxjava3.core;
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
// 返回一个当前 Observable 的包装 Observable,但对每个发出的项目应用超时策略。
// 如果从其前任开始的指定超时时间内未发出下一项,则生成的 Observable 将终止并通知观察者 TimeoutException。
public final Observable<T> timeout(long timeout, @NonNull TimeUnit unit) {
return timeout0(timeout, unit, null, Schedulers.computation());
}
private Observable<T> timeout0(long timeout, @NonNull TimeUnit unit,
@Nullable ObservableSource<? extends T> fallback,
@NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableTimeoutTimed<>(this, timeout, unit, scheduler, fallback));
}
}
2)超时处理机制
package io.reactivex.rxjava3.internal.operators.observable;
public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
TimeoutObserver<T> parent = new TimeoutObserver<>(observer, timeout, unit, scheduler.createWorker());
observer.onSubscribe(parent);
// 1. 初始化延迟任务,idx=0
parent.startTimeout(0L);
source.subscribe(parent);
}
static final class TimeoutObserver<T> extends AtomicLong
implements Observer<T>, Disposable, TimeoutSupport {
@Override
public void onNext(T t) {
long idx = get();
if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) {
return;
}
task.get().dispose();
downstream.onNext(t);
// 3. 重新触发延迟任务处理,idx = idx + 1
startTimeout(idx + 1);
}
void startTimeout(long nextIndex) {
// 2. 执行延迟任务
// 实验代码中配置延迟时间就是 (1, TimeUnit.SECONDS)
task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
}
@Override
public void onTimeout(long idx) {
// 5. 判断初始化 TimeoutTask 实例时的 idx 是否和当前的一致,
// 如果一致,说明在这个区间没有执行新的 onNext() 方法,因此:调用下游的 onError() 方法,并传入 TimeoutException 异常
// 如果不一致,说明在这个时间区间执行了新的 onNext() 方法,因此忽略。
if (compareAndSet(idx, Long.MAX_VALUE)) {
DisposableHelper.dispose(upstream);
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
worker.dispose();
}
}
}
static final class TimeoutTask implements Runnable {
final TimeoutSupport parent;
final long idx;
TimeoutTask(long idx, TimeoutSupport parent) {
this.idx = idx;
this.parent = parent;
}
@Override
public void run() {
// 4. TimeoutObserver 中的 startTimeout() 传入的超时时间到了后执行当前方法
// 调用 TimeoutObserver 中的 onTimeout 方法
// 并传入初始化 TimeoutTask 实例时的 idx 参数
parent.onTimeout(idx);
}
}
interface TimeoutSupport {
void onTimeout(long idx);
}
}
LambdaObserver
1)创建 LambdaObserver
package io.reactivex.rxjava3.core;
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
// 订阅当前的 Observable 并提供回调来处理它发出的项目以及它发出的任何错误或完成通知。
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
@NonNull Action onComplete) {
Objects.requireNonNull(onNext, "onNext is null");
Objects.requireNonNull(onError, "onError is null");
Objects.requireNonNull(onComplete, "onComplete is null");
LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
}
2)订阅方法执行逻辑
package io.reactivex.rxjava3.internal.observers;
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
// 调用实际的 onNext() 方法
// 也就是 item -> System.out.println("onNext: " + item)
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
// 设置 DISPOSED
lazySet(DisposableHelper.DISPOSED);
try {
// 调用实际的 onError() 方法
// 也就是 error -> System.out.println("onError: " + error)
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) { // 如果已经设置过 DISPOSED,则直接忽略
lazySet(DisposableHelper.DISPOSED);
try {
// 否则调用实际的 onComplete() 方法
// 也就是 () -> System.out.println("onComplete will not be printed!")
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}
}
网友评论