美文网首页
RxJava2的使用和源码分析

RxJava2的使用和源码分析

作者: WY_a111 | 来源:发表于2020-03-27 11:25 被阅读0次

Rxjava2主要的作用你可以直接理解为,就是为了异步事件而生。也就是在某个线程中去请求某个事情,但是搞完事情之后又不想在这个线程中负责任,于是需要把责任推给隔壁的那个家伙。来具体看看:

一.使用步骤:

步骤很简单,就是三步。大家都知道RxJava的设计模式是观察者模式,所以它的实现分为:创建Observeable(被观察者),创建Observer(观察者),将这两者关联起来(通过 subscribe)

二.具体每个步骤的重要方法

因为Observable里边发送的方法很多,我这里不多说因为有许多相关的文章。我的目的是把这些东西给你串联起来。然后基于重要的方法去分析它内部具体实现的源码。

Obserable的方法:

1.create() 首先肯定是进行创建这个对象,Obserable使用的是个链式结构,所以所有的方法你都可以“.”到底

  1. subscribeOn(Scheduler scheduler)用来指定你搞事情的线程
    3.observeOn(Scheduler scheduler)用来制定你要负责任的线程(处理UI事件肯定是UI线程)
    4.subscribe(Observer obs)用来将观察者和被观察者关联起来

观察者主要是回调方法:

1.onNext()用来接受数据
2.onComplete()整个传输过程完成之后的回调
3.onError()出错了的回调,这个方法跟onComplete()是互斥方法(也就是有我没他)

三.就这点儿东西,现在上代码

得到Observeable对象

fun  createObservabel( message:Message):Observable<Message>{
        var obervered:Observable<Message> = Observable.create<Message> {
            emitter->
            emitter.onNext(message)
            message.name = "wangyi2"
            emitter.onNext(message)
            emitter.onComplete()
        }
        return obervered;

    }

得到Oberver对象

fun createObserver():Observer<Message>{
        var oberver: Observer<Message> = object :Observer<Message>{
            override fun onComplete() {
                Log.i("wangyi","complete")

            }

            override fun onSubscribe(d: Disposable) {

            }

            override fun onNext(t: Message) {
                Log.i("wangyi",t.name+"--age--"+t.age)

            }

            override fun onError(e: Throwable) {

            }

        }
        return oberver;
    }

进行Observeable和Observer绑定

fun subscribe(){
        var message:Message= Message()
        message.name = "wangyi1"
        message.age = 3
        var observerable = createObservabel(message)
        var observer= createObserver();
        observerable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        observerable.subscribe(observer)

    }

好了,使用上边就这些东西。接下来咱们就是对具体源码的进行分析。

四.源码

源码分析

我个人分析源码,其实就是弄清楚这些回调方法是什么时候调用如何调用调用ok来看第一步,Observeable.create()

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

这里就是简单的进行了一个判null操作,继续看 onAssembly()这个方法。

@NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

你会发现这里就是对source进行一个判null的操作,对整体流程没有什么大的影响。于是咱们回头看ObservableCreate,这个就是一个Observable对象。其实到这里这个方法就看完了,就是创建了ObservableCreate一个Observable类型的对象。ObservableCreate有很多我们后期会一点点的看到的方法,不要急,咱们继续。

关于Observer这个对象其实没有什么可以分析的,因为它就是一个接口,一个用来后续回调的接口。

subscribe方法的分析

仔细一看大部分都是判断null的代码(个人也确实方案Java中空指针,这也是为啥我要使用kotlin的原因)

observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);

其中最主要的就是 subscribeActual(observer)这个方法,我们进一步跟进去发现它是一个抽象方法。于是咱们具体得看ObservableCreate对这个方法的实现。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

其中CreateEmitter这个静态内部类很重要,这个静态内部类里边调用了Observer的所有回调方法。哈哈,好爽吧,这里就看到了光明,后续的回调都会CreateEmitter来实现。其实这就是不指定监听线程和回调线程的情况。

RxJava线程切换

直接去参考这篇文章吧,写的很详细了https://www.jianshu.com/p/a9ebf730cd08

相关文章

  • RxJava源码(一)

    RxJava2正式版发布,学习一下源码和操作符实现,顺便做一下笔记。 一、切入正题 引入使用 简单使用 源码分析j...

  • RxJava2源码分析——线程切换

    本文章主要是对RxJava2的线程切换流程进行源码分析,在阅读之前,可以先阅读以下文章: RxJava2源码分析—...

  • RxJava2源码分析——Map操作符

    本文章主要是对RxJava2的Map操作符进行源码分析,在阅读之前,可以先阅读以下文章: RxJava2源码分析—...

  • RxJava2 源码分析

    RxJava2 源码分析 【备注】只用于个人收藏

  • RxJava2 源码分析二

    文章目录 前言 RxJava2 线程调度 RxJava2 怎么进行线程调度 总结 前言 经过RxJava2源码分析...

  • RxJava2框架源码分析三(map篇)

    1.回顾 上篇已经讲了RxJava2创建操作符create源码解析,不清楚的可以查看RxJava2框架源码分析二(...

  • RxJava2的使用和源码分析

    Rxjava2主要的作用你可以直接理解为,就是为了异步事件而生。也就是在某个线程中去请求某个事情,但是搞完事情之后...

  • rxjava2理解

    本文从建立模型的角度分析rxjava2的源码实现,适合看了众多rxjava2源码解析还是一头雾水的同学,附带少量代...

  • Retrofit配合RxJava2和Gson简单使用

    Retrofit源码学习之一Retrofit源码学习之二本篇文章分析一下Retrofit结合RxJava2和Gso...

  • RxJava2 源码分析一

    文章目录 前言 RxJava2 介绍 RxJava2 使用 带问题看源码 总结 前言 在OkHttp3+Retro...

网友评论

      本文标题:RxJava2的使用和源码分析

      本文链接:https://www.haomeiwen.com/subject/cdfluhtx.html