美文网首页Android开发Android技术知识Android知识
RxJava+Retrofit从源码分析执行流程

RxJava+Retrofit从源码分析执行流程

作者: wodezhuanshu | 来源:发表于2016-10-20 18:09 被阅读320次

    RxJava和Retrofit配合使用过程中的具体流程是怎么样的呢?
    1.如何创建一个请求
    2.具体的请求线程是什么
    3.各种线程之间的回调是怎样的

    1.如何使用Retrofit创建一个 请求
    首先我们需要定义一个接口 格式如下:

    public interface Api {
        @GET("api/data/福利/{pageCount}/{pageIndex}")
        Call<BaseModel<ArrayList<Benefit>>> defaultBenefits(
                @Path("pageCount") int pageCount,
                @Path("pageIndex") int pageIndex
        );
    
        @GET("api/data/福利/{pageCount}/{pageIndex}")
        Observable<BaseModel<ArrayList<Benefit>>> rxBenefits(
                @Path("pageCount") int pageCount,
                @Path("pageIndex") int pageIndex
        );
    }
    

    返回值类型为Call的是纯Retrofit格式的调用,
    而下面返回值类型为Observable的就是RxJava + Retrofit的格式。我们今天就通过第二种方式来介绍

    一般来说,我们使用Retrofit进行网络 请求要先创建一个Retrofit对象,而这个对象的创建是通过Builder模式来创建的 如下格式:

    Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl("http://gank.io/")
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .build();
    

    那么我们就来看看这个Retrofit对象的创建都做了些什么事情?
    Builder是Retrofit的内部类 直接看代码:

    public Builder() {
          this(Platform.get());
        }
    

    继续调用内部带参数的构造函数

    Builder(Platform platform) {
          this.platform = platform;
          // Add the built-in converter factory first. This prevents overriding its behavior but also
          // ensures correct behavior when using converters that consume all types.
          converterFactories.add(new BuiltInConverters());
        }
    

    只是对成员platform以及converterFactroises进行初始化
    这里platform的实际类型是

     static class Android extends Platform {
        @Override public Executor defaultCallbackExecutor() {
          return new MainThreadExecutor();
        }
    
        @Override CallAdapter.Factory defaultCallAdapterFactory(Executor callbackExecutor) {
          return new ExecutorCallAdapterFactory(callbackExecutor);
        }
    
        static class MainThreadExecutor implements Executor {
          private final Handler handler = new Handler(Looper.getMainLooper());
    
          @Override public void execute(Runnable r) {
            handler.post(r);
          }
        }
      }
    

    而第二个成员converterFactories只是增加了一个成员而已 至于他有什么用,我们暂且不说,因为我分析完源码也没发现这个成员变量的作用,到时候我们用到时候再进行分析。

    此时Builder已经构建完毕,接下来就是对该对象所需要的各种参数进行添加

    baseUrl("http://gank.io/")

    这句代码就是对HttpUrl进行赋值,不过这里面需要注意的点是:baseUrl后边那个/不能省略 不然拼接URL地址的时候就会出现错误,具体的解释 请看这个方法的注释 ,列举了各种写法以及结果,如果在这里出现问题,请对号入座。

    然后是

    addConverterFactory(GsonConverterFactory.create())

    这里面主要是添加了一个转换工厂,因为目前我们使用的服务器返回的数据格式都是使用JSON格式,所以这里添加了一个Gson转换工厂

    重点看这里

    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())

    这里就是使用RxJava的适配器来封装CallAdapter,具体是如何使用的 我们后边遇到了再说

    最后调用build方法创建一个Retrofit对象 ,然后返回。
    我们俩看看build方法中做了些什么事情

    public Retrofit build() {
          //对url进行非空检查
          if (baseUrl == null) {
            throw new IllegalStateException("Base URL required.");
          }
    
        / /因为初始化时候,并未对callFactory赋值 此时为null
          okhttp3.Call.Factory callFactory = this.callFactory;
          if (callFactory == null) {
            进入这里,将callFactory赋值为OkHttpClient
            callFactory = new OkHttpClient();
          }
    
          Executor callbackExecutor = this.callbackExecutor;
          if (callbackExecutor == null) {
             //platform我们在上面已经知道是Android平台 最后返回的实际类型为MainThreadExecutor
            callbackExecutor = platform.defaultCallbackExecutor();
            /*
                 @Override public Executor defaultCallbackExecutor() {
                   return new MainThreadExecutor();
                }
            */
    
    
          }
    
          // Make a defensive copy of the adapters and add the default Call adapter.
          List<CallAdapter.Factory> adapterFactories = new ArrayList<>(this.adapterFactories);
          adapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));
          /*
             @Override CallAdapter.Factory defaultCallAdapterFactory(Executor callbackExecutor) {
              return new ExecutorCallAdapterFactory(callbackExecutor);
             }
    
             在这里adapterFactories里面含有俩个变量 
             ExecutorCallAdapterFactory 和 RxJavaCallAdapterFactory
          */
    
          // Make a defensive copy of the converters.
          List<Converter.Factory> converterFactories = new ArrayList<>(this.converterFactories);
          此时该集合也有两个值 
           GsonConverterFactory  和 BuiltInConverters(基本上没什么用好像)
    
          return new Retrofit(callFactory, baseUrl, converterFactories, adapterFactories,
              callbackExecutor, validateEagerly);
        }
      }
    

    注释已经写的很清楚,就不多说了。最后通过已经初始化的成员变量对Retrofit的成员变量进行初始化。
    此时赋值之后如下:

    Retrofit(okhttp3.Call.Factory callFactory, HttpUrl baseUrl,
          List<Converter.Factory> converterFactories, List<CallAdapter.Factory> adapterFactories,
          Executor callbackExecutor, boolean validateEagerly) {
        this.callFactory = callFactory;//OkHttpClient
        this.baseUrl = baseUrl;//http://gank.io/
        this.converterFactories = unmodifiableList(converterFactories); // Defensive copy at call site.
        this.adapterFactories = unmodifiableList(adapterFactories); // Defensive copy at call site.
        this.callbackExecutor = callbackExecutor;
        //class retrofit2.Platform$Android$MainThreadExecutor
    
        this.validateEagerly = validateEagerly;//false
    
      }
    

    接下来就是重要的一步 ,就是使用我们刚才创建的Retrofit对象来构建一个服务(就是我么定义的接口)的实现类
    这里使用动态代理的方式 构建一个实现类并且返回 ,
    当我们使用返回的对象进行函数调用得时候 比如通过api对象调用函数rxBenefits

    Api api = retrofit.create(Api.class);api.rxBenefits(20, page++)
    

    接下来我们具体分析:

    public <T> T create(final Class<T> service) {
        Utils.validateServiceInterface(service);
        if (validateEagerly) {
          eagerlyValidateMethods(service);
        }
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
            new InvocationHandler() {
              private final Platform platform = Platform.get();
    
              @Override public Object invoke(Object proxy, Method method, Object... args)
                  throws Throwable {
                  //此时method就是我们接口中定义的  参数就是传递进来的两个参数
                   //public abstract rx.Observable com.stay4it.request.Api.rxBenefits(int,int)
                
                // If the method is a method from Object then defer to normal invocation.
                if (method.getDeclaringClass() == Object.class) {
                  return method.invoke(this, args);
                }
                if (platform.isDefaultMethod(method)) {
                  return platform.invokeDefaultMethod(method, service, proxy, args);
                }
                ServiceMethod serviceMethod = loadServiceMethod(method);
                OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
                return serviceMethod.callAdapter.adapt(okHttpCall);
              }
            });
      }
    

    这里面其实重要的有两步,第一步就是ServiceMethod的构建
    然后使用构建的ServiceMethod方法的adapt适配一个Observable对象返回。接下来我们就捡重要的说:

    loadServiceMethod(method);这句代码实现了什么功能?
    1.从缓存中取出以method为key的ServiceMethod
    如果没有 创建并且放进缓存对象
    2.创建ServiceMethod的过程

    public Builder(Retrofit retrofit, Method method) {
          this.retrofit = retrofit;
          this.method = method;
          //method上只有一个注解GET注解
          this.methodAnnotations = method.getAnnotations();
          //参数类型为[int ,int ]
          this.parameterTypes = method.getGenericParameterTypes();
          //参数上的注解为[Path,Path]
          this.parameterAnnotationsArray = method.getParameterAnnotations();
        }
    

    然后调用build创建一个ServiceMethod对象,其实就是对各种成员变量进行初始化
    1.callAdapter的初始化
    2.responseType的初始化
    3.responseConverter的初始化
    4.解析方法上的注解

    首先来看callAdapter的初始化
    其实就是通过方法的返回值和方法上的注解 来返回我们之前已经注册过的CallAdapter方法 我们具体来分析一下:

     public CallAdapter<?> nextCallAdapter(CallAdapter.Factory skipPast, Type returnType,
          Annotation[] annotations) {
        checkNotNull(returnType, "returnType == null");
        checkNotNull(annotations, "annotations == null");
    
        int start = adapterFactories.indexOf(skipPast) + 1;
        for (int i = start, count = adapterFactories.size(); i < count; i++) {
          CallAdapter<?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
          if (adapter != null) {
            return adapter;
          }
        }
    

    看代码 印证了我们的猜想,这时有人会有疑问了,我们这个集合adapterFactories里面目前有两个成员,RxJavaCallAdapterFactory 和 ExecutorCallAdapterFactory 那么到底是返回哪一个呢?
    当然是我们在构建Retrofit的时候我们通过addCallAdapterFactory添加进来的那个为先了 如果他匹配了 那么就直接返回 最后返回的类型是
    SimpleCallAdapter 类型 为什么是这个类型呢?
    其实看代码就能明白一切,但是我这里还是来解释一下吧

    CallAdapter<?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
    

    这行代码中我们上面分析 adapterFactories.get(i)的类型是RxJavaCallAdapterFactory 那么就会调用这个类兄的get方法 来获取实际的类型 ,并且传递的参数是

    @Override
      public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
    
        //rawType  = Observable<BaseModel<ArrayList<Benefit>>>其实就是方法的返回值
        Class<?> rawType = getRawType(returnType);
        //canonicalName = rx.Observable
        String canonicalName = rawType.getCanonicalName();
        //isSingle = false;
        boolean isSingle = "rx.Single".equals(canonicalName);
         //isCompletable = false;
        boolean isCompletable = "rx.Completable".equals(canonicalName);
        if (rawType != Observable.class && !isSingle && !isCompletable) {
          return null;
        }
    
        //returnType实际类型是ParameterizedTypeImpl类型 
        //所以!(returnType instanceof ParameterizedType)这个条件是!true = false
        if (!isCompletable && !(returnType instanceof ParameterizedType)) {
          String name = isSingle ? "Single" : "Observable";
          throw new IllegalStateException(name + " return type must be parameterized"
              + " as " + name + "<Foo> or " + name + "<? extends Foo>");
        }
    
        if (isCompletable) {
          // Add Completable-converter wrapper from a separate class. This defers classloading such that
          // regular Observable operation can be leveraged without relying on this unstable RxJava API.
          // Note that this has to be done separately since Completable doesn't have a parametrized
          // type.
          return CompletableHelper.createCallAdapter(scheduler);
        }
    
        //最后执行到了这里  调用getCallAdapter方法将返回值和调度scheduler此时为null
        CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);
        if (isSingle) {
          // Add Single-converter wrapper from a separate class. This defers classloading such that
          // regular Observable operation can be leveraged without relying on this unstable RxJava API.
          return SingleHelper.makeSingle(callAdapter);
        }
        return callAdapter;
      }
    

    最后调用了getCallAdapter方法并且参数为
    returnType---->rx.Observable<com.stay4it.model.BaseModel<java.util.ArrayList<com.stay4it.model.Benefit>>>
    scheduler----->null

    private CallAdapter<Observable<?>> getCallAdapter(Type returnType, Scheduler scheduler) {
        Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
        //rawObservableType 其实解析出来是class com.stay4it.model.BaseModel的类型
        Class<?> rawObservableType = getRawType(observableType);
        if (rawObservableType == Response.class) {
          if (!(observableType instanceof ParameterizedType)) {
            throw new IllegalStateException("Response must be parameterized"
                + " as Response<Foo> or Response<? extends Foo>");
          }
          Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
          return new ResponseCallAdapter(responseType, scheduler);
        }
    
        if (rawObservableType == Result.class) {
          if (!(observableType instanceof ParameterizedType)) {
            throw new IllegalStateException("Result must be parameterized"
                + " as Result<Foo> or Result<? extends Foo>");
          }
          Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
          return new ResultCallAdapter(responseType, scheduler);
        }
    
    //所以最后会走这里 创建一个SimpleCallAdapter类型的对象返回 
        return new SimpleCallAdapter(observableType, scheduler);
      }
    

    经过千难万阻我们才执行完ServiceMethod创建路上的第一步,不过这也是最重要的一步,接下来都是在这步的基础上进行赋值的 我们接着往下看

    responseType = callAdapter.responseType() 其实值是
    com.stay4it.model.BaseModel<java.util.ArrayList<com.stay4it.model.Benefit>> 带泛型的返回值类型 就是T的实际类型

    然后接着是responseConverter = createResponseConverter();

    实际类型 GsonResponseBodyConverter 这个分析方法跟上面的adapterFactory分析方法一致 读者自行分析。

    接下来是解析方法上的注解

    for (Annotation annotation : methodAnnotations) {  parseMethodAnnotation(annotation);}
    

    其实就是将方法的的注解 以及注解里面的占位符给提取出来 在我们这个例子当中

    @GET("api/data/福利/{pageCount}/{pageIndex}")
    

    其实就是将这个注解上的信息提取出来
    比如

    this.httpMethod = httpMethod;//GET
    this.hasBody = hasBody;//fasle
    this.relativeUrl = value;
    //api/data/福利/{pageCount}/{pageIndex}
    this.relativeUrlParamNames = parsePathParameters(value);
    其实就是set集合{pageCount,pageIndex}
    

    接着是将参数中的注解 进行解析
    ParameterHandler<?>[] parameterHandlers 就是它的初始化操作 这个有点麻烦 不过逻辑比较清晰
    在我们这个例子当中 最后类型为
    class ParameterHandler.Path<T>

    到这一步后基本上ServiceMethod的构造已经完成了

    估计大家也被绕晕了吧 ,不过Rxtrofit的代码就是这种风格,大家最好是对源码进行DEBUG 然后逐步了解每一步是如何执行以及如何赋值的 。

    接下来我们继续分析 构建完成ServiceMethod 我们接下来分析如何返回返回一个Observable对象的

    OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
    

    以ServiceMethod和args为参数 创建一个OkHttpCall ,这个对象实际上是对OkHttpClient的封装

    然后通过

    return serviceMethod.callAdapter.adapt(okHttpCall);
    

    这里的callAdapter我们上面已经分析,它的实际类型是
    SimpleCallAdapter 最后调用这个类的adapt方法

    @Override public <R> Observable<R> adapt(Call<R> call) {
          Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
              .lift(OperatorMapResponseToBodyOrError.<R>instance());
          if (scheduler != null) {
            return observable.subscribeOn(scheduler);
          }
          return observable;
        }
    

    走到这里 终于看到了熟悉的Observable了 通过调用他的静态方法 来创建一个Observable 返回 我们来看下 这个对象的构建

    在这里给大家推荐一个博客 对RxJava的分析 非常详细
    https://gank.io/post/560e15be2dca930e00da1083

    我们接着分析

    Observable.create方法做了哪些事情:
    1.创建了一个CallOnSubscribe对象 并且将我们刚才创建的OkHttpCall对象赋值为CallOnSubscribe的成员变量originalCall
    1.创建了一个Observable对象 以上面创建的CallOnSubscribe对象为参数 并且保存到成员变量
    this.onSubscribe = f中去。

    最后将这个Observable对象返回 。

    这样我们的被观察者 也就是我们的Subject已经创建好了
    接下来就是接受订阅了
    RxJava订阅是通过函数subscribe()进行的 我们来看我们这个例子中

     api.rxBenefits(20, page++)
                    .subscribeOn(Schedulers.io())//在IO线程中执行耗时操作
                    .observeOn(AndroidSchedulers.mainThread())//在主线程中更新UI
                    .subscribe(new Action1<BaseModel<ArrayList<Benefit>>>() {
                        @Override
                        public void call(BaseModel<ArrayList<Benefit>> model) {
                            if (action == PullRecycler.ACTION_PULL_TO_REFRESH) {
                                mDataList.clear();
                            }
                            if (model.results == null || model.results.size() == 0) {
                                recycler.enableLoadMore(false);
                            } else {
                                recycler.enableLoadMore(true);
                                mDataList.addAll(model.results);
                                adapter.notifyDataSetChanged();
                            }
                            recycler.onRefreshCompleted();
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            recycler.onRefreshCompleted();
                        }
                    })//然后订阅 将观察者 和被观察者绑定起来
                    
            ;
    

    这个具体是如何关联的呢 ,我觉得还是看源码吧
    当我们调用方法subscribe方法时 会传递两个Action1对象 用来执行回调

     public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
            if (onNext == null) {
                throw new IllegalArgumentException("onNext can not be null");
            }
            if (onError == null) {
                throw new IllegalArgumentException("onError can not be null");
            }
            return subscribe(new Subscriber<T>() {
                @Override
                public final void onCompleted() {
                    // do nothing
                }
                @Override
                public final void onError(Throwable e) {
                    onError.call(e);
                }
                @Override
                public final void onNext(T args) {
                    onNext.call(args);
                }
    
            });
        }```
    
    然后封装一个Subscriber对象 然后调用Observable类的subscribe方法完成订阅
    
    

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    // validate and proceed 这个肯定不为空 这是刚才我们创建的对象
    if (subscriber == null) {
    throw new IllegalArgumentException("observer can not be null");
    }

        //这个类型我们之前创建Observable的时候分析过 实际类型是CallOnSubscribe
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        
        // new Subscriber so onStart it
        subscriber.onStart();
        .......此处略过部分代码........
    
    
        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            //主要操作是这个  其实这个方法里面做的就是只是返回了observable.onSubscribe 这个对象  其实就是调用了CallOnSubscribe的call方法 并将subscriber传递进去
            return hook.onSubscribeReturn(subscriber);
        .......此处略过部分代码........
    }
    

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
    // Since Call is a one-shot type, clone it for each new subscriber.
    Call<T> call = originalCall.clone();
    //此时call实际类型为我们之前创建的OkHttpCall对象的拷贝
    // Wrap the call in a helper which handles both unsubscription and backpressure.
    //然后对OkHttpCall和subscribe 进行封装 以便之后的解除订阅
    RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
    subscriber.add(Subscriptions.create(requestArbiter));
    subscriber.setProducer(requestArbiter);
    }

    最后返回的实际上还是我们subcribe方法中new的那个对象
    
    具体是如何触发请求的执行的呢?
    

    public void setProducer(Producer p) {
    long toRequest;
    boolean passToSubscriber = false;
    synchronized (this) {
    toRequest = requested;
    producer = p;
    if (subscriber != null) {
    // middle operator ... we pass through unless a request has been made
    if (toRequest == NOT_SET) {
    // we pass through to the next producer as nothing has been requested
    passToSubscriber = true;
    }
    }
    }
    // do after releasing lock
    if (passToSubscriber) {
    subscriber.setProducer(producer);
    } else {
    // we execute the request with whatever has been requested (or Long.MAX_VALUE)
    if (toRequest == NOT_SET) {
    producer.request(Long.MAX_VALUE);
    } else {
    producer.request(toRequest);
    }
    }
    }

    其实最后调用了我们传进去的Producer对象的request方法  这应该就是真正执行请求的方法了吧  我们进去看看 
    
    

    @Override public void request(long n) {
    if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
    if (n == 0) return; // Nothing to do when requesting 0.
    if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
      //  call实际类型OkHttpCall  调用OkHttpCall的execute方法  然后返回Response
        Response<T> response = call.execute();
        if (!subscriber.isUnsubscribed()) {
    
        //这里的subsriber就是我们前边构建的匿名内部类对象  调用他的OnNext方法 将会调用注册的Action11对象的onNext方法 将结果回调给UI线程进行界面处理 
    
          subscriber.onNext(response);
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);
        }
        return;
      }
    
      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }
    
    
    我们来看OkHttpCall的execute方法 
    

    @Override public Response<T> execute() throws IOException {
    okhttp3.Call call;

    synchronized (this) {
      if (executed) throw new IllegalStateException("Already executed.");
      executed = true;
    
      if (creationFailure != null) {
        if (creationFailure instanceof IOException) {
          throw (IOException) creationFailure;
        } else {
          throw (RuntimeException) creationFailure;
        }
      }
    
      call = rawCall;
      if (call == null) {
        try {
         ## call = rawCall = createRawCall();//主要看这里 这里真正创建了真实的请求 返回值类型为RealCall
    
       /*
       private okhttp3.Call createRawCall() throws IOException {
            Request request = serviceMethod.toRequest(args);//这里是对请求进行封装 
              //serviceMethod.callFactory 类型为OkHttpClient这是我们创建Retrofit对象时候创建的
          okhttp3.Call call = serviceMethod.callFactory.newCall(request);
            if (call == null) {
          t    hrow new NullPointerException("Call.Factory returned null.");
        }
          return call;
    

    }

    */
    } catch (IOException | RuntimeException e) {
    creationFailure = e;
    throw e;
    }
    }
    }

    if (canceled) {
      call.cancel();
    }
    

    //最后执行call的execute方法 并将结果 传递给parseResponse方法 进行解析
    return parseResponse(call.execute());
    }

    接着往下走
    

    Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
    ResponseBody rawBody = rawResponse.body();

    // Remove the body's source (the only stateful object) so we can pass the response along.
    rawResponse = rawResponse.newBuilder()
        .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
        .build();
    
    int code = rawResponse.code();
    if (code < 200 || code >= 300) {
        .........
    }
    
    if (code == 204 || code == 205) {
     ...........
    }
    
    ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
    try {
      ###T body = serviceMethod.toResponse(catchingBody); //到这里将我们得到打的body转换成我们的T类型 在我们这个例子里面就是BaseModel对象 具体的解析操作 肯定是交给我们的解析工厂了  直接贴代码
    
     /*
    

    //GsonResponseBodyConverter的convert方法
    @Override public T convert(ResponseBody value) throws IOException { JsonReader jsonReader = gson.newJsonReader(value.charStream()); try { return adapter.read(jsonReader); } finally { value.close(); }}
    */

      return Response.success(body, rawResponse);
    } catch (RuntimeException e) {
      // If the underlying source threw an exception, propagate that rather than indicating it was
      // a runtime exception.
      catchingBody.throwIfCaught();
      throw e;
    }
    

    }

    
    最后将解析的结果 封装成 一个Response对象 返回 注意这里的Response是retrofit2包下的
    
    最后 回到我们调用处 ,也就是RequestAribre方法的request方法 
    

    try { Response<T> response = call.execute(); if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); }} catch (Throwable t) { Exceptions.throwIfFatal(t); if (!subscriber.isUnsubscribed()) { subscriber.onError(t); } return;}if (!subscriber.isUnsubscribed()) { subscriber.onCompleted();}

    前边我们说过 此时这里的subscriber对象是我们订阅时候创建的匿名内部类对象  里面会回调用户自定义的onnext方法 
    我们来看下:
    

    .subscribe(new Action1<BaseModel<ArrayList<Benefit>>>() {
    @Override
    public void call(BaseModel<ArrayList<Benefit>> model) {
    if (action == PullRecycler.ACTION_PULL_TO_REFRESH) {
    mDataList.clear();
    }
    if (model.results == null || model.results.size() == 0) {
    recycler.enableLoadMore(false);
    } else {
    recycler.enableLoadMore(true);
    mDataList.addAll(model.results);
    adapter.notifyDataSetChanged();
    }
    recycler.onRefreshCompleted();
    }
    }

    经过重重回调  终于回来了  最后到了最重要的一步 拿到数据了 那么就是该填充适配器 来显示到界面上了  
    这里就是我们所说的UI线程 main线程了 。

    相关文章

      网友评论

        本文标题:RxJava+Retrofit从源码分析执行流程

        本文链接:https://www.haomeiwen.com/subject/hrifuttx.html