RxJava原理解析一

作者: alighters | 来源:发表于2015-12-02 00:16 被阅读23563次

初学RxJava,对其许多的API颇感神奇,所以RxJava的原理充满了兴趣。正好最近教父大头鬼也出了一篇RxJava解析的文章,本人也结合源码给出自己的理解。

这里主要先就一点来讲解。问题如下:

订阅跟被订阅的关系?是如何实现这一机制的?

  • 首先,理解OnSubscribe的概念
 /**
  * Invoked when Observable.subscribe is called.
  */
 public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
     // cover for generics insanity
 }

OnSubscribe继承自Action1,来看看Action1是干什么的

/**
 * A one-argument action.
 */
public interface Action1<T> extends Action {
    void call(T t);
}

Action1仅仅是一个参数的泛型接口,提供了使用泛型的类型来作为参数,这样就可以调用这个指定的泛型类型。
在此,我把这个onSubscribe理解为订阅的行为。这个行为是指,我在创建一个被观察者的时候,就是要指定这个被观察者所要发布的行为;在观察者的角度来理解,就表示观察者订阅了这些行为。

  • 理解了这个OnSubscribe之后,看看Observable的创建。这里主要看Observable的构造方法:
 /**
  * Creates an Observable with a Function to execute when it is subscribed to.
  * <p>
  * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
  * unless you specifically have a need for inheritance.
  * 
  * @param f
  *            {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
  */
 protected Observable(OnSubscribe<T> f) {
     this.onSubscribe = f;
 }

额,很简单的嘛。这里主要就是需要把OnSubscribe的属性保存下来,(即咱们所订阅的行为)。这里咱们着重点放在这段代码的类注释上面:创建一个带有执行操作的被观察者,当它被订阅时,执行它的操作(即咱们提到的订阅行为)翻译的不好,欢迎拍砖啊。另外,就是这里的推荐的内容了,官方希望我们尽量通过create来创建Obserable,而不是使用继承。

  • 订阅者/观察者的使用。咱们知晓了被观察者的创建,接下来就是观察者。它的实现就很简单了。主要是由一些的主要的周期函数构成。在此,就略过了。
  • 订阅操作的实现。这个是咱们关注的重头戏。先看代码:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
  if (subscriber == null) {
      throw new IllegalArgumentException("observer can not be null");
  }
  if (observable.onSubscribe == null) {
      throw new IllegalStateException("onSubscribe function can not be null.");
      /*
       * the subscribe function can also be overridden but generally that's not the appropriate approach
       * so I won't mention that in the exception
       */
  }
  
  // new Subscriber so onStart it
  subscriber.onStart();
  
  /*
   * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
   * to user code from within an Observer"
   */
  // if not already wrapped
  if (!(subscriber instanceof SafeSubscriber)) {
      // assign to `observer` so we return the protected version
      subscriber = new SafeSubscriber<T>(subscriber);
  }

  // The code below is exactly the same an unsafeSubscribe but not used because it would 
  // add a significant depth to already huge call stacks.
  try {
      // allow the hook to intercept and/or decorate
      hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
      return hook.onSubscribeReturn(subscriber);
  } catch (Throwable e) {
      // special handling for certain Throwable/Error/Exception types
      Exceptions.throwIfFatal(e);
      // if an unhandled error occurs executing the onSubscribe we will propagate it
      try {
          subscriber.onError(hook.onSubscribeError(e));
      } catch (Throwable e2) {
          Exceptions.throwIfFatal(e2);
          // if this happens it means the onError itself failed (perhaps an invalid function implementation)
          // so we are unable to propagate the error correctly and will just throw
          RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
          // TODO could the hook be the cause of the error in the on error handling.
          hook.onSubscribeError(r);
          // TODO why aren't we throwing the hook's return value.
          throw r;
      }
      return Subscriptions.unsubscribed();
  }
}

代码略多,能一行行读下来,还是需要相当耐心的。这里,我就从代码中获取到的知识点做些说明:1)首先也是最重要的,当执行了subscribe的方法,就开始执行Subscriber的订阅操作。2)Subscriber的onStart方法,是第一个被调用的。3)在代码中,hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),这句代码主要是获取到observable的onSubscribe属性,然后调用它的call方法,而且参数还是subscriber,很熟悉了,有木有,这就是之前所说的订阅的行为。3)在执行观察者订阅的行为的时候,可能会出现错误,通过捕捉异常,来调用subscriber的onError方法。

  • 总结:这里一个简单的完整的订阅与被订阅的流程就结束了,对其原理做以概括:创建一个被观察者,需要给被观察者指定其所发布的行为(onSubscribe来实现);指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber的onStart方法,之后通过订阅行为onSubscribe来调用subscriber完成相应的订阅操作,最后若出现异常则会回调subscriber的onError方法。

  • 问题:因为之前看过一些RxJava的介绍文章,提到一点Subscriber的onComplete方法和onError方法是只会执行一个,记不太确切了,是不是这样。
    但是从源码中可以看出,若是执行到onComplete的方法时候,若在其中抛出了一场,那之后Observable的subscribe方法会捕捉异常,又会调用到onError方法,所以这样看的话,onComplete和onError是有机会都执行到的。

看了这个简单的订阅,产生了一些疑问,也就是接下来要去研究的问题,也是下篇要解决的问题,列举如下:

  • TODO:
  1. onError和onComplete的执行唯一性
    2)多个观察者是如何处理订阅的?

相关文章

  • RxJava

    使用RxJava:添加依赖: 走进RxJava:RxJava实质上就是一个异步操作库。API介绍和原理解析:1.扩...

  • MVP+Retrofit+Rxjava在项目中实战解析 

    文章目标 MVP在android中的原理解析 MVP+Retrofit+Rxjava在项目中实战解析 架构经验分享...

  • Android架构

    MVP+Retrofit+Rxjava在项目中实战解析 文章目标 MVP在android中的原理解析 MVP+Re...

  • 学习清单

    HTTP原理解析 RxJava使用和原理,并应用到BaseLib库,还有Dragger MVVM 多线程和线程池使...

  • RxJava源码解析(二)

    前言 本篇主要解析RxJava的线程切换的原理实现 subscribeOn 首先, 我们先看下subscribeO...

  • RxJava原理解析一

    初学RxJava,对其许多的API颇感神奇,所以RxJava的原理充满了兴趣。正好最近教父大头鬼也出了一篇RxJa...

  • Rxjava原理解析

    先看RxJava的简单使用及解析: 以上是Rxjava的一个简单示例,第一步通过Single.just()发送一个...

  • RxJava原理解析

    rxJava的思维 响应式编程,卡片式编程,流式编程,有一个起点和一个终点,起点开始流向我们的“事件”, 把事件流...

  • RxJava AutoDispose原理解析

    版权声明:本文为博主原创文章,未经博主允许不得转载https://blog.csdn.net/wsygyb/art...

  • RxJava的原理解析

    前言 RxJava的核心:订阅流程、线程切换。直接看用法: 首先我们看代码知道需要先Create然后继续调用下去,...

网友评论

本文标题:RxJava原理解析一

本文链接:https://www.haomeiwen.com/subject/mkgjhttx.html