美文网首页我爱编程
Android RxJava 源码流程分析

Android RxJava 源码流程分析

作者: umbrella1 | 来源:发表于2018-05-26 22:07 被阅读40次
    2.png
    1.png
    3.png
    最终会形成一条被观察者链子,每个被观察者对象都有各自的线程Schedulers用来切换线程。
    OkHttpCall封装okhttp3相关的操作。
    我们分5个步骤来分析流程:
    第一步:
    在retrofit.addCallApdapterFactory(RxJava2CallAdapterFactory.create()),会把RxJava2CallAdapterFactory加入retrofit中的变量private final List<CallAdapter.Factory> adapterFactories = new ArrayList<>();
    中,后面要用到这个生成适配类RxJava2CallAdapter
    这个类会调用adapt(Call<R> call)生成被观察者Observable<T>的对象。
    这是典型的桥接模式
    第二步:
    create函数中动态代理对象,动态代理类都必须要实现InvocationHandler这个接口,当我们通过代理对象调用一个方法的时候,这个方法的调用就会被转发为由InvocationHandler这个接口的 invoke 方法来进行调用。我们来看看InvocationHandler这个接口的唯一一个方法 invoke 方法,即后面调用service.getSearchBook()时,会走到这个invoke的方法中。
    public <T> T create(final Class<T> service) {
      Utils.validateServiceInterface(service);
      if (validateEagerly) {
        eagerlyValidateMethods(service);
      }
      return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
          new InvocationHandler() {
            private final Platform platform = Platform.get();
    
            @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
                throws Throwable {
              // If the method is a method from Object then defer to normal invocation.
              if (method.getDeclaringClass() == Object.class) {
                return method.invoke(this, args);
              }
              if (platform.isDefaultMethod(method)) {
                return platform.invokeDefaultMethod(method, service, proxy, args);
              }
              ServiceMethod<Object, Object> serviceMethod =
                  (ServiceMethod<Object, Object>) loadServiceMethod(method);
              OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
              return serviceMethod.callAdapter.adapt(okHttpCall);
            }
          });
    }
    ServiceMethod<?, ?> loadServiceMethod(Method method) {
      ServiceMethod<?, ?> result = serviceMethodCache.get(method);
      if (result != null) return result;
    
      synchronized (serviceMethodCache) {
        result = serviceMethodCache.get(method);
        if (result == null) {
          result = new ServiceMethod.Builder<>(this, method).build();
          serviceMethodCache.put(method, result);
        }
      }
      return result;
    }
    

    其中SerivceMethod.callAdapter.adapt(okHttpCall),生成流程:
    ServiceMethod.Builder().build()中,调用retroift.callAdatper()生成callAdapter对象。
    在retrofit中,callAdapter又调用nextCallAdapter,在这里面,会从adapterFactories变量中取出RxJava2CallAdapterFactory对象,调用get(),生成RxJava2CallAdapter.
    最后serviceMethod.callAdapter.adapt(okHttpCall),会到RxJava2CallAdapter.adapt函数中。
    在RxJava2CallAdapter函数中:

    @Override public <R> Object adapt(Call<R> call) {
      Observable<Response<R>> responseObservable = new CallObservable<>(call);
      Observable<?> observable;
      if (isResult) {
        observable = new ResultObservable<>(responseObservable);
      } else if (isBody) {
        observable = new BodyObservable<>(responseObservable);
      } else {
        observable = responseObservable;
      }
      if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
      }
      if (isFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
      }
      if (isSingle) {
        return observable.singleOrError();
      }
      if (isMaybe) {
        return observable.singleElement();
      }
      if (isCompletable) {
        return observable.ignoreElements();
      }
      return observable;
    }
    

    会生成被观察者BodyObservable<>(responseObservable);其中Observable<Response<R>> responseObservable = new CallObservable<>(call);即BodyObservable的变量Observable<Response<T>> upstream
    这样在第二步中生成的对象就是BodyObservable
    第三步:生成被观察者ObservableSubscribeOn:
    observable.subscribeOn(Schedulers.io())

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    就是把BodyObservable和IoScheduler放到ObservableSubscribeOn,
    其中ObservableSource<T> source是BodyObservable
    第四步:生成被观察者ObservableObserveOn
    observeOn(AndroidSchedulers.mainThread())

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    把ObservableSubscribeOn 和 HandlerScheduler
    放入到被观察者 ObservableObserveOn<T>,其中ObservableSource<T> source是ObservableSubscribeOn对象,形成一条被观察者的责任链模式,对应各个的Schedulers来实现线程之间的切换,以及Disposable的接口,实现中断等。

    第五步:创建观察者,通过订阅者连接被观察者。
    subscribe(observer);
    这个observer是观察者,通过subscribe就开始处理各个事件。

    相关文章

      网友评论

        本文标题:Android RxJava 源码流程分析

        本文链接:https://www.haomeiwen.com/subject/dftdjftx.html