美文网首页
RxJava原理解析

RxJava原理解析

作者: 飘絮无意 | 来源:发表于2022-12-13 19:06 被阅读0次

RxJava 是一种响应式编程,来创建基于事件的异步操作库。基于事件流的链式调用、逻辑清晰简洁。

平时用的多但是没认真分析源码总感觉虚的很废话不多说直接上代码

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("567");
                emitter.onNext("789");
                emitter.onNext("112");
                emitter.onNext("666");
                emitter.onNext("666");
                emitter.onNext("666");
                emitter.onNext("666");

            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

看看 Observable.create做了什么就是创建了ObservableCreate对象其实也是个Observable接着链式调用map,RxJavaPlugins.onAssembly:默认情况下是返回source,如果需要监听Rx的操作符,可以调用setOnObservableAssembly方法,

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

map方法里面也是创建了ObservableMap对象也是Observable接着链式调用subscribeOn方法,这个map方法就是做数据类型转换的就不展开

   @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

subscribeOn方法同样也是创建一个ObservableSubscribeOn是个Observablej接着调用observeOn,这个方法不用多说就是线程切换的,一般用于把被观察者放到子线程里去执行一些耗时或者io操作

    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

observeOn方法最终创建了一个ObservableObserveOn对象当然也是个Observable,一串链式调用下来创建了一堆Observable对象不过别急重点来了,接着调用了subscribe方法

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

subcribe方法最关键的方法是subscribeActual,这个方法是ObservableObserveOn里面方法创建了一个Worker ,这个是用来处理从子线程切换到主线程的对象,线程切换这里不展开了,接着调用了source.subscribe(observer)方法,source是上一个observable对象也就是ObservableSubscribeOn,也就是调用了上一个obserable的subcribe方法,并且把自定义的observer也传过去,这是要干啥我们接着进去看看, observer.onSubscribe(),observer不就是下一个observable传进来的吗就是ObserveOnObserver对象,我们进去看看里面的onSubscribe方法,这个方法关键代码是downstream.onSubscribe(this);downstream称为下游就是最终调用到我们自定义的observer里面了,回调到onSubscribe里了给我们自行处理;回到ObservableSubscribeOn的subscribeActual方法中接着调用了parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));这个方法就是让线程切换到io线程中并且执行一个Runnable任务,这个任务就是SubscribeTask里面具体做的事是source.subscribe(parent);又是一样的味道不就是调用上一个obervable的subscribe方法然后执行对应observable的subscribeActual方法,接着是不是一直调用上一个observable的直到第一个observable的subscribeActual这里,对的就是这样的,一直到ObservableCreate的subscribeActual方法,这个方法调用了 首先调用observer.onSubscribe(parent);又是一样的套路就是依次调用下一个observable的里面内部类observer的onSubscribe一直到ObservableSubscribeOn这里的onsubcribe,由于ObservableSubscribeOn在这里做了拦截往下一个abservable里面的observer调用onSubscribe已经调用过了,好的我们继续往下看调用了 source.subscribe(parent);source就是我们自定义的ObservableOnSubscribe,调用这个方法就是发射数据了接着进到CreateEmitter的onNext方法,里面关键方法是 observer.onNext(t);observer就是下一个observable里面的observer,也就是把数据传到下一个observable里面进行处理,那是不是依次调用把数据传到下个abservable去处理一直传到我们自定义的Observer的onnext里面,没错就是这样的,这样就完成了rxjava的整个数据处理流程了。

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

//位于ObservableObserveOn中
  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }


//位于ObservableSubscribeOn中
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

//位于ObservableObserveOn中
   @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                downstream.onSubscribe(this);
            }
        }
//位于ObservableSubscribeOn中
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}
//位于ObservableMap中
   @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
// ObservableCreate中
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
//CreateEmitter中
      @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

为了方便理解画了一份思维导图


rxjava.PNG

相关文章

  • RxJava

    使用RxJava:添加依赖: 走进RxJava:RxJava实质上就是一个异步操作库。API介绍和原理解析:1.扩...

  • MVP+Retrofit+Rxjava在项目中实战解析 

    文章目标 MVP在android中的原理解析 MVP+Retrofit+Rxjava在项目中实战解析 架构经验分享...

  • Android架构

    MVP+Retrofit+Rxjava在项目中实战解析 文章目标 MVP在android中的原理解析 MVP+Re...

  • 学习清单

    HTTP原理解析 RxJava使用和原理,并应用到BaseLib库,还有Dragger MVVM 多线程和线程池使...

  • RxJava源码解析(二)

    前言 本篇主要解析RxJava的线程切换的原理实现 subscribeOn 首先, 我们先看下subscribeO...

  • Rxjava原理解析

    先看RxJava的简单使用及解析: 以上是Rxjava的一个简单示例,第一步通过Single.just()发送一个...

  • RxJava原理解析

    rxJava的思维 响应式编程,卡片式编程,流式编程,有一个起点和一个终点,起点开始流向我们的“事件”, 把事件流...

  • RxJava原理解析一

    初学RxJava,对其许多的API颇感神奇,所以RxJava的原理充满了兴趣。正好最近教父大头鬼也出了一篇RxJa...

  • RxJava AutoDispose原理解析

    版权声明:本文为博主原创文章,未经博主允许不得转载https://blog.csdn.net/wsygyb/art...

  • RxJava的原理解析

    前言 RxJava的核心:订阅流程、线程切换。直接看用法: 首先我们看代码知道需要先Create然后继续调用下去,...

网友评论

      本文标题:RxJava原理解析

      本文链接:https://www.haomeiwen.com/subject/zzmhqdtx.html