美文网首页
RxJava timeout 的使用

RxJava timeout 的使用

作者: 蓝笔头 | 来源:发表于2021-07-13 13:04 被阅读0次

    例子

    1)引入 Maven 依赖

            <!-- https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava -->
            <dependency>
                <groupId>io.reactivex.rxjava3</groupId>
                <artifactId>rxjava</artifactId>
                <version>3.0.13</version>
            </dependency>
    

    2)实验代码:

    import io.reactivex.rxjava3.core.Observable;
    import java.util.concurrent.TimeUnit;
    
    public class TestTimeout {
    
        public static void main(String[] args) {
            Observable<String> source = Observable.create(emitter -> {
                emitter.onNext("A");
    
                Thread.sleep(800);
                emitter.onNext("B");
    
                Thread.sleep(400);
                emitter.onNext("C");
    
                Thread.sleep(1200);
                emitter.onNext("D");
                emitter.onComplete();
            });
    
            source.timeout(1, TimeUnit.SECONDS)
                .subscribe(
                    item -> System.out.println("onNext: " + item),
                    error -> System.out.println("onError: " + error),
                    () -> System.out.println("onComplete will not be printed!"));
        }
    }
    

    3)控制台输出:

    onNext: A
    onNext: B
    onNext: C
    onError: java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
    

    源码解析

    ObservableTimeoutTimed

    1)创建 ObservableTimeoutTimed

    package io.reactivex.rxjava3.core;
    
    public abstract class Observable<@NonNull T> implements ObservableSource<T> {
        // 返回一个当前 Observable 的包装 Observable,但对每个发出的项目应用超时策略。
        // 如果从其前任开始的指定超时时间内未发出下一项,则生成的 Observable 将终止并通知观察者 TimeoutException。
        public final Observable<T> timeout(long timeout, @NonNull TimeUnit unit) {
            return timeout0(timeout, unit, null, Schedulers.computation());
        }
        
        private Observable<T> timeout0(long timeout, @NonNull TimeUnit unit,
                @Nullable ObservableSource<? extends T> fallback,
                @NonNull Scheduler scheduler) {
            Objects.requireNonNull(unit, "unit is null");
            Objects.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableTimeoutTimed<>(this, timeout, unit, scheduler, fallback));
        }
    }
    

    2)超时处理机制

    package io.reactivex.rxjava3.internal.operators.observable;
    
    public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            TimeoutObserver<T> parent = new TimeoutObserver<>(observer, timeout, unit, scheduler.createWorker());
            observer.onSubscribe(parent);
            // 1. 初始化延迟任务,idx=0
            parent.startTimeout(0L);
            source.subscribe(parent);
        }
    
        static final class TimeoutObserver<T> extends AtomicLong
        implements Observer<T>, Disposable, TimeoutSupport {
    
            @Override
            public void onNext(T t) {
                long idx = get();
                if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) {
                    return;
                }
    
                task.get().dispose();
    
                downstream.onNext(t);
                
                // 3. 重新触发延迟任务处理,idx = idx + 1
                startTimeout(idx + 1);
            }
    
            void startTimeout(long nextIndex) {
                // 2. 执行延迟任务
                // 实验代码中配置延迟时间就是 (1, TimeUnit.SECONDS)
                task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
            }
    
            @Override
            public void onTimeout(long idx) {
                // 5. 判断初始化 TimeoutTask 实例时的 idx 是否和当前的一致,
                // 如果一致,说明在这个区间没有执行新的 onNext() 方法,因此:调用下游的 onError() 方法,并传入 TimeoutException 异常
                // 如果不一致,说明在这个时间区间执行了新的 onNext() 方法,因此忽略。
                if (compareAndSet(idx, Long.MAX_VALUE)) {
                    DisposableHelper.dispose(upstream);
    
                    downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
    
                    worker.dispose();
                }
            }
        }
    
        static final class TimeoutTask implements Runnable {
    
            final TimeoutSupport parent;
    
            final long idx;
    
            TimeoutTask(long idx, TimeoutSupport parent) {
                this.idx = idx;
                this.parent = parent;
            }
    
            @Override
            public void run() {
                // 4. TimeoutObserver 中的 startTimeout() 传入的超时时间到了后执行当前方法
                // 调用 TimeoutObserver 中的 onTimeout 方法
                // 并传入初始化 TimeoutTask 实例时的 idx 参数
                parent.onTimeout(idx);
            }
        }
    
        interface TimeoutSupport {
    
            void onTimeout(long idx);
    
        }
    }
    

    LambdaObserver

    1)创建 LambdaObserver

    package io.reactivex.rxjava3.core;
    
    public abstract class Observable<@NonNull T> implements ObservableSource<T> {
        // 订阅当前的 Observable 并提供回调来处理它发出的项目以及它发出的任何错误或完成通知。
        public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
                @NonNull Action onComplete) {
            Objects.requireNonNull(onNext, "onNext is null");
            Objects.requireNonNull(onError, "onError is null");
            Objects.requireNonNull(onComplete, "onComplete is null");
    
            LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
    
            subscribe(ls);
    
            return ls;
        }
    }
    

    2)订阅方法执行逻辑

    package io.reactivex.rxjava3.internal.observers;
    
    public final class LambdaObserver<T> extends AtomicReference<Disposable>
            implements Observer<T>, Disposable, LambdaConsumerIntrospection {
    
        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                try {
                    // 调用实际的 onNext() 方法
                    // 也就是 item -> System.out.println("onNext: " + item)
                    onNext.accept(t);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    get().dispose();
                    onError(e);
                }
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!isDisposed()) {
                // 设置 DISPOSED
                lazySet(DisposableHelper.DISPOSED);
                try {
                    // 调用实际的 onError() 方法
                    // 也就是 error -> System.out.println("onError: " + error)
                    onError.accept(t);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    RxJavaPlugins.onError(new CompositeException(t, e));
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) { // 如果已经设置过 DISPOSED,则直接忽略
                lazySet(DisposableHelper.DISPOSED);
                try {
                    // 否则调用实际的 onComplete() 方法
                    // 也就是  () -> System.out.println("onComplete will not be printed!")
                    onComplete.run();
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    RxJavaPlugins.onError(e);
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:RxJava timeout 的使用

          本文链接:https://www.haomeiwen.com/subject/nqzjpltx.html