美文网首页
RxJava2内部实现

RxJava2内部实现

作者: 咚咚_Coding | 来源:发表于2021-08-29 23:51 被阅读0次

    Use

          //1、Observable
        Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("android RxJava2");
            }
           
        })//2、线程切换
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                //3、订阅 Subscribe
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("RxJavaUtils  create", s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d("RxJavaUtils  create", throwable.toString());
                    }
                });
    

    create操作符执行流程

    订阅subscribe
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        subscribe(ls);
        return ls;
    }
    
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
    observer = RxJavaPlugins.onSubscribe(this, observer);
    subscribeActual(observer);
    
    }
    
    Observable
    public abstract class Observable<T> implements ObservableSource<T> 
    protected abstract void subscribeActual(Observer<? super T> observer);
    
    ObservableSource
    public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
    }
    
    ObservableOnSubscribe 参数
     public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    
    ObservableCreate
    public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        private static final long serialVersionUID = -3434801548987643227L;
    
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }}
    

    相关文章

      网友评论

          本文标题:RxJava2内部实现

          本文链接:https://www.haomeiwen.com/subject/xaxniltx.html