美文网首页Android开发经验谈
okhttp源码学习(二)主要流程

okhttp源码学习(二)主要流程

作者: 刘景昌 | 来源:发表于2019-06-29 09:31 被阅读1次

    一个基本的OKhttp基本创建立流程

        OkHttpClient client = new OkHttpClient();
        RequestBody body = RequestBody.create(JSON, "{json:json}");
        Request request = new Request.Builder()
                .url("https:www.baidu.com")
                .post(body)
                .build();
        Response response = client.newCall(request).execute();
    

    分析下最基础的类

    1.OkHttpClient

    分析构造方法并添加注释(添加注释是为了 后面看着更加方便)

      public OkHttpClient() {
        this(new Builder());
      }
    
      OkHttpClient(Builder builder) {
        this.dispatcher = builder.dispatcher; //调度器
        this.proxy = builder.proxy;//代理
        this.protocols = builder.protocols;//协议
        this.connectionSpecs = builder.connectionSpecs;//传输层版本和连接协议
        this.interceptors = Util.immutableList(builder.interceptors);//拦截器
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors);//网络拦截器
        this.eventListenerFactory = builder.eventListenerFactory;
        this.proxySelector = builder.proxySelector;//代理选择器
        this.cookieJar = builder.cookieJar;//cookie
        this.cache = builder.cache;//cache 缓存
        this.internalCache = builder.internalCache;//内部缓存
        this.socketFactory = builder.socketFactory;//socket 工厂
        boolean isTLS = false;
        for (ConnectionSpec spec : connectionSpecs) {
          isTLS = isTLS || spec.isTls();
        }
    
        if (builder.sslSocketFactory != null || !isTLS) {
          this.sslSocketFactory = builder.sslSocketFactory;//socket工厂 用于https
          this.certificateChainCleaner = builder.certificateChainCleaner;//验证确认响应书,适用HTTPS 请求连接的主机名
        } else {
          X509TrustManager trustManager = Util.platformTrustManager();
          this.sslSocketFactory = newSslSocketFactory(trustManager);
          this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
        }
    
        if (sslSocketFactory != null) {
          Platform.get().configureSslSocketFactory(sslSocketFactory);
        }
    
        this.hostnameVerifier = builder.hostnameVerifier;//主机名字确认
        this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
            certificateChainCleaner);//证书链
        this.proxyAuthenticator = builder.proxyAuthenticator;//代理身份验证
        this.authenticator = builder.authenticator;//本地省份验证
        this.connectionPool = builder.connectionPool;//链接池 复用连接
        this.dns = builder.dns;//域名
        this.followSslRedirects = builder.followSslRedirects;//安全套接层重定向
        this.followRedirects = builder.followRedirects;//本地重定向
        this.retryOnConnectionFailure = builder.retryOnConnectionFailure;//重试连接失败
        this.callTimeout = builder.callTimeout;
        this.connectTimeout = builder.connectTimeout;//连接超时
        this.readTimeout = builder.readTimeout;//读取超时
        this.writeTimeout = builder.writeTimeout;//写入超时
        this.pingInterval = builder.pingInterval;
    
        if (interceptors.contains(null)) {
          throw new IllegalStateException("Null interceptor: " + interceptors);
        }
        if (networkInterceptors.contains(null)) {
          throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
        }
      }
    

    我们在创建好了 OkHttpClient 之后我们有创建了一个Request

    2.Request、Response

    request:我们通常说的一个请求所携带的所有信息由 Headers和 RequestBody组成 RequestBody有两个子类 FormBody (表单提交的)和 MultipartBody(文件上传)
    Response:服务请求的返回值,同样包括Headers和RequestBody,而ResponseBod的子类也是有两个:RealResponseBody(真实响应)和CacheResponseBody(缓存响应)
    在生成了真正的请求后 我们 然后就是使用 client.newCall(request).execute();去真正的请求数据
    OkHttpClient 的newCall方法

    @Override public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
      }
    

    实际上是调用了 RealCall的newRealCall 方法
    下面我们来看看RealCall类

    3.RealCall类

    先来看构造方法 已添加注释

     /***
       * 构造方法
       * @param client 初始的 OkHttpClient对象
       * @param originalRequest 请求的 Request对象
       * @param forWebSocket 是否使用webSokcet
       */
     static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.transmitter = new Transmitter(client, call);
        return call;
      }
    
    

    里面的两个重要方法
    execute():同步网络请求
    enqueue():异步网络请求
    我们以同步为例子继续向下看

      @Override public Response execute() throws IOException {
        //使用同步加 executed 布尔值判断 确认executed只有一个在执行
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        //创建超时控制
        transmitter.timeoutEnter();
        // 调用eventListener的callStart方法(绑定call)
        transmitter.callStart();
        try {
          //将call添加到runningSyncCalls这个双向队列中
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
      }
    

    关于Transmitter 后面再或这次只过主流程
    我们走到最后 到getResponseWithInterceptorChain方法中 我们进去看一下

      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        //添加开发者应用层自定义的Interceptor
        interceptors.addAll(client.interceptors());
        //这个Interceptor是处理请求失败的重试,重定向
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        //这个Interceptor工作是添加一些请求的头部或其他信息 并对返回的Response做一些友好的处理
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //这个Interceptor的职责是判断缓存是否存在,读取缓存,更新缓存等等
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //这个Interceptor的职责是建立客户端和服务器的连接
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          //添加开发者自定义的网络层拦截器
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
        //一个包裹这request的chain
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
            originalRequest, this, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    
        boolean calledNoMoreExchanges = false;
        try {
          Response response = chain.proceed(originalRequest);
          if (transmitter.isCanceled()) {
            closeQuietly(response);
            throw new IOException("Canceled");
          }
          return response;
        } catch (IOException e) {
          calledNoMoreExchanges = true;
          throw transmitter.noMoreExchanges(e);
        } finally {
          if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null);
          }
        }
      }
    

    这这里我们获得了一个拦截器集合 通过 RealInterceptorChain的构造方法获得 Interceptor.Chain
    我们看一下RealInterceptorChain的构造方法

      public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
          @Nullable Exchange exchange, int index, Request request, Call call,
          int connectTimeout, int readTimeout, int writeTimeout) {
        this.interceptors = interceptors;
        this.transmitter = transmitter;
        this.exchange = exchange;
        this.index = index;
        this.request = request;
        this.call = call;
        this.connectTimeout = connectTimeout;
        this.readTimeout = readTimeout;
        this.writeTimeout = writeTimeout;
      }
    

    这里面只有赋值没有其他的操作
    然后就剩下一个了chain.proceed(originalRequest) 我们来看这个方法
    真正的proceed方法

      public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
          throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
    
        calls++;
    
        // If we already have a stream, confirm that the incoming request will use it.
        if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must retain the same host and port");
        }
    
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        if (this.exchange != null && calls > 1) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must call proceed() exactly once");
        }
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
        // Confirm that the next interceptor made its required call to chain.proceed().
        if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
    
        // Confirm that the intercepted response isn't null.
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
    
        if (response.body() == null) {
          throw new IllegalStateException(
              "interceptor " + interceptor + " returned a response with no body");
        }
    
        return response;
      }
    

    最后返回带到 excute中

    
     @Override public Response execute() throws IOException {
        //使用同步加 executed 布尔值判断 确认executed只有一个在执行
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        //创建超时控制
        transmitter.timeoutEnter();
        // 调用eventListener的callStart方法(绑定call)
        transmitter.callStart();
        try {
          //将call添加到runningSyncCalls这个双向队列中
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
      }
    

    到这里同步的流程就走完了。
    再来看看异步的请求

    Response response = client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });
    

    进入 enqueue方法

    
    
      @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        // 调用eventListener的callStart方法(绑定call)
        transmitter.callStart();
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

    合同步一样先调用了 transmitter.callStart();然后调用 dispatcher.enqueue(new AsyncCall(responseCallback));那么咱们进入dispatcher看下

    
      void enqueue(AsyncCall call) {
        synchronized (this) {
          readyAsyncCalls.add(call);
    
          // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
          // the same host.
          if (!call.get().forWebSocket) {
            AsyncCall existingCall = findExistingCallWithHost(call.host());
          //这只是一个赋值函数
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
          }
        }
        promoteAndExecute();
      }
    

    第一步我们把call添加到 readyAsyncCalls 然后判断 是否为websocket 进入reuseCallsPerHostFrom方法 最后进图promoteAndExecute() 我们进入promoteAndExecute里面看一下

    
        //最大同时请求数量
      private int maxRequests = 64;
      //同一个Host下的最大同时请求数量
      private int maxRequestsPerHost = 5;
      private boolean promoteAndExecute() {
        assert (!Thread.holdsLock(this));
    
        List<AsyncCall> executableCalls = new ArrayList<>();
        boolean isRunning;
        synchronized (this) {
          for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
          //如果正在执行的请求小于设定值即64
            if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
          //请求同一个主机的request小于设定值即5
            if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
    
            i.remove();
            asyncCall.callsPerHost().incrementAndGet();
            executableCalls.add(asyncCall);
            runningAsyncCalls.add(asyncCall);
          }
          isRunning = runningCallsCount() > 0;
        }
    
        for (int i = 0, size = executableCalls.size(); i < size; i++) {
          AsyncCall asyncCall = executableCalls.get(i);
          asyncCall.executeOn(executorService());
        }
    
        return isRunning;
      }
    

    根据源码和注释大家可以看到如果正在执行的异步请求小于64,并且请求同一个主机小于5的时候就先往正在运行的队列里面添加这个call,然后循环 执行 asyncCall.executeOn我们先去看一下 AsyncCall类

      final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
        private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
    
        AtomicInteger callsPerHost() {
          return callsPerHost;
        }
    
        void reuseCallsPerHostFrom(AsyncCall other) {
          this.callsPerHost = other.callsPerHost;
        }
    
        String host() {
          return originalRequest.url().host();
        }
    
        Request request() {
          return originalRequest;
        }
    
        RealCall get() {
          return RealCall.this;
        }
    
        /**
         * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
         * if the executor has been shut down by reporting the call as failed.
         */
        void executeOn(ExecutorService executorService) {
          assert (!Thread.holdsLock(client.dispatcher()));
          boolean success = false;
          try {
            executorService.execute(this);
            success = true;
          } catch (RejectedExecutionException e) {
            InterruptedIOException ioException = new InterruptedIOException("executor rejected");
            ioException.initCause(e);
            transmitter.noMoreExchanges(ioException);
            responseCallback.onFailure(RealCall.this, ioException);
          } finally {
            if (!success) {
              client.dispatcher().finished(this); // This call is no longer running!
            }
          }
        }
    
        @Override protected void execute() {
          boolean signalledCallback = false;
          transmitter.timeoutEnter();
          try {
            Response response = getResponseWithInterceptorChain();
            signalledCallback = true;
            responseCallback.onResponse(RealCall.this, response);
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
      }
    
    

    有父类进去看一下

    public abstract class NamedRunnable implements Runnable {
      protected final String name;
    
      public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
      }
    
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
    

    看到这里我们终于有看到熟悉的方法了 就是getResponseWithInterceptorChain()这个方法 然后就是安装同步的流程走完全部
    下面我们来就来看一下整的Okhttp的流程图


    image.png

    下篇开始一个个的详解

    最后献上一份添加了注释的源码 https://github.com/525642022/okhttpTest/blob/master/README.md
    哈哈

    相关文章

      网友评论

        本文标题:okhttp源码学习(二)主要流程

        本文链接:https://www.haomeiwen.com/subject/bnfyqctx.html