美文网首页android
Rxjava源码解析--基本用法源码分析

Rxjava源码解析--基本用法源码分析

作者: 二妹是只猫 | 来源:发表于2019-03-26 09:45 被阅读4次

    观察者(订阅)模式
    想了解rxjava就绕不过观察者模式,在观察者(订阅)模式文中对该模式有一个基本的介绍。

    创建于使用

    • 第一步创建被观察者(订阅者):
    Observable observable = Observable.create(new 
            ObservableOnSubscribe<Object>() {
         @Override
          public void subscribe(ObservableEmitter<Object> emitter) 
                throws Exception {
              emitter.onNext("hello");
          }
     });
    
    • 第二步创建观察者:
          Observer observer = new Observer() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Object o) {
                  System.out.println("{}"+o);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          };
    
    • 订阅:
      observable.subscribe(observer);
    

    这样一个简单的Rxjava就创建成功并成功订阅了,当运行上方代码时,订阅者Observer的onNext方法会被调用并接收到“hello”

    现在来看看观察者、被观察者是如何被创建并关联上的

    • 创建被观察者Observable.create(ObservableOnSubscribe<T> source):
      @SchedulerSupport(SchedulerSupport.NONE)
      public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
          ObjectHelper.requireNonNull(source, "source is null");
          return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
      }
    
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
        ...
    }
    

    create方法返回一个Observable的子类ObservableCreate,给其中ObservableOnSubscribe赋值。注意subscribeActual方法 ,之后调用被观察者订阅观察者的subscribe最终就是执行的这儿。

    • 创建观察者对象Observer:
    public interface Observer<T> {
    
      void onSubscribe(@NonNull Disposable d);
    
      void onNext(@NonNull T t);
    
      void onError(@NonNull Throwable e);
    
      void onComplete();
    
    }
    

    Observer很简单就是一个接口

    • 最关键的一步,观察者于被观察者绑定observable.subscribe(observer)
      Observable:
      public final void subscribe(Observer<? super T> observer) {
          ObjectHelper.requireNonNull(observer, "observer is null");
          try {
              observer = RxJavaPlugins.onSubscribe(this, observer);
    
              ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
              subscribeActual(observer);
          } catch (NullPointerException e) { // NOPMD
              throw e;
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              // can't call onError because no way to know if a Disposable has been set or not
              // can't call onSubscribe because the call might have set a Subscription already
              RxJavaPlugins.onError(e);
    
              NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
              npe.initCause(e);
              throw npe;
          }
      }
    

    通过上面subscribeActual(observer)这个抽象方法,调用到Observerble的实现类ObserverblecreatesubscribeActual(observer)(果然如我们上面所说的):

    public final class ObservableCreate<T> extends Observable<T> {
      final ObservableOnSubscribe<T> source;
    
      public ObservableCreate(ObservableOnSubscribe<T> source) {
          this.source = source;
      }
    
      @Override
      protected void subscribeActual(Observer<? super T> observer) {
          CreateEmitter<T> parent = new CreateEmitter<T>(observer);
          observer.onSubscribe(parent);
    
          try {
              source.subscribe(parent);
          } catch (Throwable ex) {
              Exceptions.throwIfFatal(ex);
              parent.onError(ex);
          }
      }
    
      static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
    
          private static final long serialVersionUID = -3434801548987643227L;
    
          final Observer<? super T> observer;
    
          CreateEmitter(Observer<? super T> observer) {
              this.observer = observer;
          }
    
          @Override
          public void onNext(T t) {
              if (t == null) {
                  onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                  return;
              }
              if (!isDisposed()) {
                  observer.onNext(t);
              }
          }
    
          @Override
          public void onError(Throwable t) {
              if (!tryOnError(t)) {
                  RxJavaPlugins.onError(t);
              }
          }
        ...省略代码
    }
    

    这里创建了CreateEmitter,它实现了ObservableEmitter接口,这里就将我们刚开始创建的被观察者的subcriibe(ObservableEmitter<Object> emitter)关联了起来。最终我们我们在Observable的subscribe方法中调用的emitter.onNext("hello")就是它实现的:

    到这里我们的rxjava示例就完整的运行完毕了,最终就是Observable的实ObservableCreateObserver关联,并通过CreateEmitter发送通知消息,1中调用到observer的onNext(t),这样一个简单的rxjava就实现了。

    相关文章

      网友评论

        本文标题:Rxjava源码解析--基本用法源码分析

        本文链接:https://www.haomeiwen.com/subject/ygovvqtx.html