美文网首页
Retrofit源码分析

Retrofit源码分析

作者: Edmond_33 | 来源:发表于2017-12-27 20:58 被阅读0次

    Retrofit

    Retrofit发起一个简单地网络请求

    导入retrofit

    compile 'com.squareup.retrofit2:retrofit:2.3.0'
    compile 'com.squareup.retrofit2:converter-gson:2.3.0'
    compile ('com.squareup.retrofit2:adapter-rxjava:2.0.0'){
        exclude group: 'io.reactivex', module: 'rxjava'
    }//支持rxjava
    compile ('io.reactivex:rxandroid:1.2.1'){
        exclude group: 'io.reactivex', module: 'rxjava'
    }
    

    新建一个api

    public interface Api {
        @GET("api/v1/abc/def")
        Observable<ResponseBody> getTotal();
    
        @GET("api/v1/mnl/xyz")
        Observable<ResponseBody> getMoneyTotal();
    }
    

    初始化Retrofit和Okhttp

    初始化Okhttp

    private OkHttpClient initOkHttpClient(){
        OkHttpClient client = new OkHttpClient.Builder()
                .addInterceptor(new Interceptor() {
                    @Override
                    public Response intercept(Chain chain) throws IOException {
                        Request r = chain.request().newBuilder()
                                .addHeader("x-device-info","HUAWEI/7.0/").build();
                        return chain.proceed(r);
                    }
                }).build();
        return client;
    }
    

    初始化Retrofit

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl("http://****.com/")
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .client(initOkHttpClient())
            .build();
    

    RxJavaCallAdapterFactory.createWithScheduler 接受Scheduler
    可以在发起请求的时候指定Schedulers.io();

    发起网络请求

    Api api = retrofit.create(Api.class);
    Observable<ResponseBody> observable = api.getTotal();
    observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<ResponseBody>() {
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    
        @Override
        public void onNext(ResponseBody responseBody) {
            try {
                Log.e(TAG, "call: "+ new String(responseBody.bytes()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    });
    

    看完上面代码产生以下疑问

    • API 接口怎么映射到对应的请求retrofit.create(Api.class)
    • ConverterFactoryConverterFactory怎么工作的
    • Retrofit怎么支持Rxjava的

    解决问题最粗暴的方法断点跟踪,大致跟下代码。
    Copy源代码到工程里面,导入对应的包

    compile 'com.squareup.okhttp3:okhttp:3.2.0'
    compile 'com.google.code.gson:gson:2.6.1'
    compile 'io.reactivex:rxjava:1.2.1'
    compile 'com.google.code.findbugs:jsr305:3.0.0'
    

    流程图

    1. 先来看看create(Api.class)的代码
    public <T> T create(final Class<T> service) {
     
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
          new InvocationHandler() {
            private final Platform platform = Platform.get();
    
            @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
                throws Throwable {
              Log.e(TAG, "invoke: "+ method.getName() );
              // If the method is a method from Object then defer to normal invocation.
                  if (method.getDeclaringClass() == Object.class) {
                  return method.invoke(this, args);
                }
                  ServiceMethod<Object, Object> serviceMethod =
                  (ServiceMethod<Object, Object>) loadServiceMethod(method);
              OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
              return serviceMethod.callAdapter.adapt(okHttpCall);
            }
          });
    }
    

    这里用到了动态代理模式Proxy.newProxyInstance
    每次调用到Api中的方法都会回调InvocationHandler中的invoke打印出来的log如下


    image-55fe26-1514379291279)]
    我们发现有toString方法也打印出来了,如果是Object的方法就直接返回当前方法自己
    1. 下来是loadMethod(Method method) 返回一个
      Adapts an invocation of an interface method into an HTTP call
      映射一个接口的调用到网络请求
      看到这个基本就能想到loadMethod是通过method初始化一个ServiceMethod

      CallAdapter<R, T>
      Adapts a {@link Call} with response type {@code R} into the type of {@code T}. Instances are
      created by {@linkplain Factory a factory} which is
      {@linkplain Retrofit.Builder#addCallAdapterFactory(Factory) installed} into the {@link Retrofit}
      instance.

      //Retrofit.loadMethod
      result = new ServiceMethod.Builder<>(this, method).build();// this是retrofit
      
      // ServiceMthod.Builder
      Builder(Retrofit retrofit, Method method) {
         this.retrofit = retrofit;
         this.method = method;
         this.methodAnnotations = method.getAnnotations();
         //获取方法的参数类型
         this.parameterTypes = method.getGenericParameterTypes();
         //获取参数的注解
         this.parameterAnnotationsArray = method.getParameterAnnotations();
      }
      

      build中主要完成以下功能

      • 通过createCallAdapter()初始化callAdapter
      • 通过createResponseConverter初始化responseConverter
      • 解析方法上面的注解parseMethodAnnotation
        • httpMethod @GET @POST
        • relativeUrl
        • headers
      • 解析参数的注解parseParameter
    2. 新建一个OkHttpCall extens Call
      Call
      An invocation of a Retrofit method that sends a request to a webserver and returns a response.
      Each call yields its own HTTP request and response pair. Use {@link #clone} to make multiple
      calls with the same parameters to the same webserver; this may be used to implement polling or
      to retry a failed call.
      <p>Calls may be executed synchronously with {@link #execute}, or asynchronously with {@link
      enqueue}. In either case the call can be canceled at any time with {@link cancel}. A call that
      is busy writing its request or reading its response may receive a {@link IOException}; this is
      working as designed.
      @param <T> Successful response body type.

    public interface Call<T> extends Cloneable {
      /**
       * Synchronously send the request and return its response.
       *
       * @throws IOException if a problem occurred talking to the server.
       * @throws RuntimeException (and subclasses) if an unexpected error occurs creating the request
       * or decoding the response.
       */
      Response<T> execute() throws IOException;
    
      /**
       * Asynchronously send the request and notify {@code callback} of its response or if an error
       * occurred talking to the server, creating the request, or processing the response.
       */
      void enqueue(Callback<T> callback);
    
      /**
       * Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain
       * #enqueue(Callback) enqueued}. It is an error to execute or enqueue a call more than once.
       */
      boolean isExecuted();
    
      /**
       * Cancel this call. An attempt will be made to cancel in-flight calls, and if the call has not
       * yet been executed it never will be.
       */
      void cancel();
    
      /** True if {@link #cancel()} was called. */
      boolean isCanceled();
    
      /**
       * Create a new, identical call to this one which can be enqueued or executed even if this call
       * has already been.
       */
      Call<T> clone();
    
      /** The original HTTP request. */
      Request request();
    }
    

    重点注意execute这个真正发起请求的方法(代码在下面),而request仅仅是原生http请求时调用

    OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
    return serviceMethod.callAdapter.adapt(okHttpCall);
    

    通过RxJavaCallAdapter.adapt

    @Override public Object adapt(Call<R> call) {
      OnSubscribe<Response<R>> callFunc = isAsync
          ? new CallEnqueueOnSubscribe<>(call)
          : new CallExecuteOnSubscribe<>(call);
    
      OnSubscribe<?> func;
      if (isResult) {
        func = new ResultOnSubscribe<>(callFunc);
      } else if (isBody) {
        func = new BodyOnSubscribe<>(callFunc);
      } else {
        func = callFunc;
      }
      Observable<?> observable = Observable.create(func);
    
      if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
      }
    
      if (isSingle) {
        return observable.toSingle();
      }
      if (isCompletable) {
        return observable.toCompletable();
      }
      return observable;
    }
    

    定义一个CallExecuteOnSubscribe,当Observeable被订阅,回调call方法发起网络请求
    同时这也可以为Observale指定线程类型

    final class CallExecuteOnSubscribe<T> implements OnSubscribe<Response<T>> {
      private final Call<T> originalCall;
    
      CallExecuteOnSubscribe(Call<T> originalCall) {
        this.originalCall = originalCall;
      }
    
      @Override public void call(Subscriber<? super Response<T>> subscriber) {
        // Since Call is a one-shot type, clone it for each new subscriber.
        Call<T> call = originalCall.clone();
        CallArbiter<T> arbiter = new CallArbiter<>(call, subscriber);
        subscriber.add(arbiter);
        subscriber.setProducer(arbiter);
    
        Response<T> response;
        try {
          response = call.execute();
        } catch (Throwable t) {
          Exceptions.throwIfFatal(t);
          arbiter.emitError(t);
          return;
        }
        arbiter.emitResponse(response);
      }
    }
    
    /**OKHttpCall**/
    @Override public Response<T> execute() throws IOException {
      okhttp3.Call call;
       synchronized (this) {
        if (executed) throw new IllegalStateException("Already executed.");
        executed = true;
         .......
        call = rawCall;
        if (call == null) {
           call = rawCall = createRawCall();
        }
      }
      if (canceled) {
        call.cancel();
      }
      return parseResponse(call.execute());
    }
    
    private okhttp3.Call createRawCall() throws IOException {
      Request request = serviceMethod.toRequest(args);
      okhttp3.Call call = serviceMethod.callFactory.newCall(request);
      return call;
    }
    

    这里的callFactory就是我们初始化的OkhttpClient

    • response = call.execute() 是阻塞的,这个是在那个线程里面跑,这个线程什么时候开启的?
    • CallArbiter deliverResponse中会分别调用 subscriber.onNext(response)和subscriber.onComplete()。按我的理解他应该直接回调MainActivity中的Subscriber.onNext
      顺序如下
    CallArbiter----deliverResponse---onNext
    MainActivity---Subscriber--------onNext
    CallArbiter----deliverResponse---onComplete
    MainActivity---Subscriber--------onComplete
    

    加上线程可以看出因为不在一个线程里面。

    MainActivity------------------开始调用API--------------- ---------------------main
    Retrofit.InvocationHandler----invoke 调用;getTotal---------------------------main
    ServiceMethod.Builder---------createCallAdapter------------------------------main
    ServiceMethod.Builder---------createResponseConverter------------------------main
    ServiceMethod.Builder---------parseMethodAnnotation -------------------------main
    RxJavaCallAdapter-------------adapt -----------------------------------------main
    MainActivity------------------调用API结束-------------------------------------main
    MainActivity------------------已经添加订阅者-----------------------------------main
    CallExecuteOnSubscribe--------call -------------------------------RxIoScheduler-2
    CallArbiter-------------------request---(state=0)-----------------RxIoScheduler-2
    OkHttpCall--------------------execute ----------------------------RxIoScheduler-2
    CallArbiter-------------------emitResponse----(state=1)-----------RxIoScheduler-2
    CallArbiter-------------------request.deliverResponse.onNext -----RxIoScheduler-2
    CallArbiter-------------------equest.deliverResponse.onCompleted---RxIoScheduler-2
    MainActivity.Subscriber-------onNext-----------------------------------------main
    MainActivity.Subscriber-------onCompleted------------------------------------main
    

    看到这个log基本上面那个问题:什么时候切换的线程就清楚了,当Observable添加到订阅者后,切换线程。

    • CallArbiter是干嘛的
    final class CallArbiter<T> extends AtomicInteger implements Subscription, Producer {
       
        CallArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
            super(STATE_WAITING);
            this.call = call;
            this.subscriber = subscriber;
        }
     
        @Override
        public void request(long amount) {
            if (amount == 0) {
                return;
            }
            while (true) {
                int state = get();
                switch (state) {
                    case STATE_WAITING:
                        if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) {
                            return;
                        }
                        break; // State transition failed. Try again.
    
                    case STATE_HAS_RESPONSE:
                        if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) {
                            deliverResponse(response);
                            return;
                        }
                        break; // State transition failed. Try again.
                                ......
                }
            }
        }
    
        void emitResponse(Response<T> response) {
            while (true) {
                int state = get();
                switch (state) {
                    case STATE_WAITING:
                        this.response = response;
                        if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) {
                            return;
                        }
                        break; // State transition failed. Try again.
    
                    case STATE_REQUESTED:
                        if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) {
                            deliverResponse(response);
                            return;
                        }
                        break; // State transition failed. Try again.
                            .......
                }
            }
        }
    
        private void deliverResponse(Response<T> response) {
            try {
                if (!isUnsubscribed()) {
                    subscriber.onNext(response);
                }
            }  
            ......
            try {
                if (!isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } .......
        }
    
    }
    

    这个类继承Producer,查看下这个接口的定义

    Interface that establishes a request-channel between an Observable and a Subscriber and allows
    the Subscriber to request a certain amount of items from the Observable (otherwise known as
    backpressure).
    大致意思是在Observable和Subscriber之间建立一个请求通道,允许订阅者从可观察对象处请求大量项目
    当Observable注册到Subscriber上时,会调用request.

    public interface Producer {
    
        /**
         * Request a certain maximum number of items from this Producer. This is a way of requesting backpressure.
         * To disable backpressure, pass {@code Long.MAX_VALUE} to this method.
         * <p>
         * Requests are additive but if a sequence of requests totals more than {@code Long.MAX_VALUE} then
         * {@code Long.MAX_VALUE} requests will be actioned and the extras <i>may</i> be ignored. Arriving at
         * {@code Long.MAX_VALUE} by addition of requests cannot be assumed to disable backpressure. For example,
         * the code below may result in {@code Long.MAX_VALUE} requests being actioned only.**/
        void request(long n);
    }//rxjava 中 Observeable是怎么注册到Subscriber上的
    

    通过request把当前状态设置为STATE_REQUESTED
    在手动调用emitResponse时,执行deliverResonse,完成回调

    继承Subscription是为了把Subscriber的取消映射到Call中
    继承AtomicInteger为了线程安全,按照Java编程思想里面说的,如果不是JVM的大神请不要使用这个类。这里就知道干嘛就好了
    基本先这样吧,还有几个地方不太清楚,以后再补吧。
    1.网络错误怎么处理的
    2.超时了,怎么发起请求

    相关文章

      网友评论

          本文标题:Retrofit源码分析

          本文链接:https://www.haomeiwen.com/subject/ptdpgxtx.html