美文网首页
OkHttp源码分析

OkHttp源码分析

作者: 就叫汉堡吧 | 来源:发表于2020-11-12 21:31 被阅读0次
    • 本文概述

      结合使用从源码层面分析OkHttp的原理。

    • 使用回顾

      public static String getByOkHttp(String url) throws IOException {
          OkHttpClient client = new OkHttpClient();
          Request request = new Request.Builder()
                  .url(url)
                  .build();
      
          try (Response response = client.newCall(request).execute()) {
              return Objects.requireNonNull(response.body()).string();
          }
      }
      
      public static final MediaType JSON
              = MediaType.get("application/json; charset=utf-8");
      
      public static String postByOkHttp(String url, String json) throws IOException {
          OkHttpClient client = new OkHttpClient();
          RequestBody body = RequestBody.create(json, JSON);
          Request request = new Request.Builder()
                  .url(url)
                  .post(body)
                  .build();
          try (Response response = client.newCall(request).execute()) {
              return Objects.requireNonNull(response.body()).string();
          }
      }
      

      简单回顾一下OkHttp的使用api,在Retrofit出现之前,我们对这个用法并不陌生,相比于HttpClient,参数可以直接使用json添加,因为json是通用格式且易解析,不需要像HttpClient那样requestBody里面都是a=b这样去存,通常的操作都是bean->json,json->bean,所以这更合理;request和requestConfig可以放在一起链式调用,调用过程更简洁。OkHttp淘汰了HttpClient成为主流网络请求框架的原因可能不止于此,随着源码再来看看有什么不同。

    • 源码分析

      首先,OkHttpClient的构造方法里:

      public OkHttpClient() {
        this(new Builder());
      }
      
      ......
        
      public Builder() {
            dispatcher = new Dispatcher();
            protocols = DEFAULT_PROTOCOLS;
            connectionSpecs = DEFAULT_CONNECTION_SPECS;
            eventListenerFactory = EventListener.factory(EventListener.NONE);
            proxySelector = ProxySelector.getDefault();
            if (proxySelector == null) {
              proxySelector = new NullProxySelector();
            }
            cookieJar = CookieJar.NO_COOKIES;
            socketFactory = SocketFactory.getDefault();
            hostnameVerifier = OkHostnameVerifier.INSTANCE;
            certificatePinner = CertificatePinner.DEFAULT;
            proxyAuthenticator = Authenticator.NONE;
            authenticator = Authenticator.NONE;
            connectionPool = new ConnectionPool();
            dns = Dns.SYSTEM;
            followSslRedirects = true;
            followRedirects = true;
            retryOnConnectionFailure = true;
            callTimeout = 0;
            connectTimeout = 10_000;
            readTimeout = 10_000;
            writeTimeout = 10_000;
            pingInterval = 0;
      }
      

      可以看到在Builder的构造方法里,一些必须的参数都初始化了默认值,这里的参数暂时先不管,用到的时候才能知道它的意义。

      RequestBody.create方法会返回一个RequestBody对象:

      /**
         * Returns a new request body that transmits {@code content}. If {@code contentType} is non-null
         * and lacks a charset, this will use UTF-8.
      */
      public static RequestBody create(@Nullable MediaType contentType, String content) {
          Charset charset = UTF_8;
          if (contentType != null) {
            charset = contentType.charset();
            if (charset == null) {
              charset = UTF_8;
              contentType = MediaType.parse(contentType + "; charset=utf-8");
            }
          }
          byte[] bytes = content.getBytes(charset);
          return create(contentType, bytes);
      }
      
      ......
        
      /** Returns a new request body that transmits {@code content}. */
      public static RequestBody create(final @Nullable MediaType contentType, final byte[] content) {
          return create(contentType, content, 0, content.length);
      }
      
      ......
      
      /** Returns a new request body that transmits {@code content}. */
      public static RequestBody create(final @Nullable MediaType contentType, final byte[] content,
          final int offset, final int byteCount) {
        if (content == null) throw new NullPointerException("content == null");
        Util.checkOffsetAndCount(content.length, offset, byteCount);
        return new RequestBody() {
          @Override public @Nullable MediaType contentType() {
            return contentType;
          }
      
          @Override public long contentLength() {
            return byteCount;
          }
      
          @Override public void writeTo(BufferedSink sink) throws IOException {
            sink.write(content, offset, byteCount);
          }
        };
      }
      

      匿名类对象RequestBody有一个writeTo方法,这里猜测应该是socket网络写入的入口,具体再往下看。

      Request的构建就是一系列的解析和参数赋值,这里不深究这些细节,看一下client.newCall(request):

      /**
       * Prepares the {@code request} to be executed at some point in the future.
       */
      @Override public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
      }
      
      static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.transmitter = new Transmitter(client, call);
        return call;
      }
      

      返回了一个RealCall对象,realCall.execute就会返回一个Response了,所以这个execute就是调用网络请求的入口:

      @Override public Response execute() throws IOException {
        synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        transmitter.timeoutEnter();
        transmitter.callStart();
        try {
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
      }
      

      首先调用transmitter的timeoutEnter方法:

      public void timeoutEnter() {
        timeout.enter();
      }
      
      private final AsyncTimeout timeout = new AsyncTimeout() {
        @Override protected void timedOut() {
          cancel();
        }
      };
      

      所以去AsyncTimeOut里面找enter方法:

      public final void enter() {
        if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
        long timeoutNanos = timeoutNanos();
        boolean hasDeadline = hasDeadline();
        if (timeoutNanos == 0 && !hasDeadline) {
          return; // No timeout and no deadline? Don't bother with the queue.
        }
        inQueue = true;
        scheduleTimeout(this, timeoutNanos, hasDeadline);
      }
      

      这里判断如果没有设置call超时时间和deadline则不scheduleTimeout,值得一提的是,以timeoutNanos为例:

      public Transmitter(OkHttpClient client, Call call) {
        this.client = client;
        this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
        this.call = call;
        this.eventListener = client.eventListenerFactory().create(call);
        this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
      }
      

      timeoutNanos是client.callTimeoutMillis的值,设置这个值我们需要改一下okhttpClient的构造方式:

      //设置call的timeout时长是30秒
      OkHttpClient client = new OkHttpClient().newBuilder().callTimeout(30000, TimeUnit.MILLISECONDS).build();
      

      再回到enter,走到scheduleTimeout方法:

      private static synchronized void scheduleTimeout(
          AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
        // Start the watchdog thread and create the head node when the first timeout is scheduled.
        if (head == null) {
          head = new AsyncTimeout();
          new Watchdog().start();
        }
      
        long now = System.nanoTime();
        if (timeoutNanos != 0 && hasDeadline) {
          // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
          // Math.min() is undefined for absolute values, but meaningful for relative ones.
          node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
        } else if (timeoutNanos != 0) {
          node.timeoutAt = now + timeoutNanos;
        } else if (hasDeadline) {
          node.timeoutAt = node.deadlineNanoTime();
        } else {
          throw new AssertionError();
        }
      
        // Insert the node in sorted order.
        long remainingNanos = node.remainingNanos(now);
        for (AsyncTimeout prev = head; true; prev = prev.next) {
          if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
            node.next = prev.next;
            prev.next = node;
            if (prev == head) {
              AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
            }
            break;
          }
        }
      }
      

      这里的工作是有一个链表记录着所有的call请求,如果当前请求是第一个则即刻开启一个线程去开始倒计时,设置当前请求的timeoutAt(截止时间),并根据链表中截止时间的先后顺序把它插入到链表中适当的位置。

      继续往下看,接下来是transmitter.callStart():

      public void callStart() {
        this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
        eventListener.callStart(call);
      }
      

      eventListener是哪里来的:

      public Transmitter(OkHttpClient client, Call call) {
        this.client = client;
        this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
        this.call = call;
        this.eventListener = client.eventListenerFactory().create(call);
        this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
      }
      

      因为我们没有手动设置,所以client.eventListenerFactory()是Builder构造方法里默认的参数,通过EventListener.factory(EventListener.NONE)构造,通过查找发现EventListener.NONE的callStart是一个空实现,所以在这里没有意义,继续看client.dispatcher().executed(this):

      /** Used by {@code Call#execute} to signal it is in-flight. */
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
      

      runningSyncCalls保存这个call。然后调用getResponseWithInterceptorChain:

      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
      
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
            originalRequest, this, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
      
        boolean calledNoMoreExchanges = false;
        try {
          Response response = chain.proceed(originalRequest);
          if (transmitter.isCanceled()) {
            closeQuietly(response);
            throw new IOException("Canceled");
          }
          return response;
        } catch (IOException e) {
          calledNoMoreExchanges = true;
          throw transmitter.noMoreExchanges(e);
        } finally {
          if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null);
          }
        }
      }
      

      前面设置一系列intercepator,然后构造一个RealInterceptorChain对象,并调用它的proceed方法:

      public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
          throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
      
        calls++;
      
        // If we already have a stream, confirm that the incoming request will use it.
        if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must retain the same host and port");
        }
      
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        if (this.exchange != null && calls > 1) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must call proceed() exactly once");
        }
      
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
      
        // Confirm that the next interceptor made its required call to chain.proceed().
        if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
      
        // Confirm that the intercepted response isn't null.
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
      
        if (response.body() == null) {
          throw new IllegalStateException(
              "interceptor " + interceptor + " returned a response with no body");
        }
      
        return response;
      }
      

      response是通过intercaptor.intercept()方法获得的,还记得之前getResponseWithInterceptorChain方法里设置的一系列intercept吗,这里我们不关注自定义的interceptor,只考虑必须设置的、源码里固定的几个interceptor,根据getResponseWithInterceptorChain方法里的添加顺序如下:

      interceptors.add(new RetryAndFollowUpInterceptor(client));
      interceptors.add(new BridgeInterceptor(client.cookieJar()));
      interceptors.add(new CacheInterceptor(client.internalCache()));
      interceptors.add(new ConnectInterceptor(client));
      if (!forWebSocket) {
        //这里的forWebSocket是false,但是client.networkInterceptors是空的
        interceptors.addAll(client.networkInterceptors());
      }
      interceptors.add(new CallServerInterceptor(forWebSocket));
      

      首先RealInterceptorChain参数中index是0,所以在proceed中interceptors.get(index)就是RetryAndFollowUpInterceptor,执行它的intercept方法,传入的参数是持有下一个interceptor(也就是BridgeInterceptor)的RealInterceptorChain:

      @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Transmitter transmitter = realChain.transmitter();
      
        int followUpCount = 0;
        Response priorResponse = null;
        while (true) {
          transmitter.prepareToConnect(request);
      
          if (transmitter.isCanceled()) {
            throw new IOException("Canceled");
          }
      
          Response response;
          boolean success = false;
          try {
            response = realChain.proceed(request, transmitter, null);
            success = true;
          } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            if (!recover(e.getLastConnectException(), transmitter, false, request)) {
              throw e.getFirstConnectException();
            }
            continue;
          } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, transmitter, requestSendStarted, request)) throw e;
            continue;
          } finally {
            // The network call threw an exception. Release any resources.
            if (!success) {
              transmitter.exchangeDoneDueToException();
            }
          }
      
          // Attach the prior response if it exists. Such responses never have a body.
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
      
          Exchange exchange = Internal.instance.exchange(response);
          Route route = exchange != null ? exchange.connection().route() : null;
          Request followUp = followUpRequest(response, route);
      
          if (followUp == null) {
            if (exchange != null && exchange.isDuplex()) {
              transmitter.timeoutEarlyExit();
            }
            return response;
          }
      
          RequestBody followUpBody = followUp.body();
          if (followUpBody != null && followUpBody.isOneShot()) {
            return response;
          }
      
          closeQuietly(response.body());
          if (transmitter.hasExchange()) {
            exchange.detachWithViolence();
          }
      
          if (++followUpCount > MAX_FOLLOW_UPS) {
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
      
          request = followUp;
          priorResponse = response;
        }
      }
      

      看见while(true)了吗,这就是方法名Retry的意义。

      transmitter.prepareToConnect(request)就是设置一些属性,这些属性目前也不明确他们的含义,所以暂时先不管。

      关键代码response = realChain.proceed(request, transmitter, null)把逻辑又带到了之前RealInterceptorChain的proceed方法里,所以和之前一样,只不过现在的interceptors.get(index)变成了BridgeInterceptor,next变成了持有下一个interceptor(CacheInterceptor)的RealInterceptorChain,所以此时走到了BridgeInterceptor的intercept方法:

      @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
      
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
      
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
      
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
      
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
      
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
      
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
      
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
      
        Response networkResponse = chain.proceed(requestBuilder.build());
      
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
      
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
      
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
      
        return responseBuilder.build();
      }
      

      可以看到,在chain.proceed(requestBuilder.build())之前的代码就是给request设置一系列header。

      按照递归调用逻辑,此时的主角变成了CacheInterceptor,它的intercept方法如下:

      @Override public Response intercept(Chain chain) throws IOException {
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
      
        long now = System.currentTimeMillis();
      
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
      
        if (cache != null) {
          cache.trackResponse(strategy);
        }
      
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
      
        // If we're forbidden from using the network and the cache is insufficient, fail.
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(Util.EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
      
        // If we don't need the network, we're done.
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
      
        Response networkResponse = null;
        try {
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
      
        // If we have a cache response too, then we're doing a conditional get.
        if (cacheResponse != null) {
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
      
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
      
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
      
        if (cache != null) {
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
      
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
      
        return response;
      }
      

      在chain.proceed(requestBuilder.build())之前的代码就是优先从缓存中读取response,如果没有,则继续往下走,我们当这是此request的第一次请求,所以接下来的就是ConnectInterceptor的intercept:

       @Override public Response intercept(Chain chain) throws IOException {
          RealInterceptorChain realChain = (RealInterceptorChain) chain;
          Request request = realChain.request();
          Transmitter transmitter = realChain.transmitter();
      
          // We need the network to satisfy this request. Possibly for validating a conditional GET.
            //所以GET是不安全的
          boolean doExtensiveHealthChecks = !request.method().equals("GET");
          Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
      
          return realChain.proceed(request, transmitter, exchange);
        }
      }
      

      通过transmitter.newExchange创建一个Exchange:

      /** Returns a new exchange to carry a new request and response. */
      Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        synchronized (connectionPool) {
          if (noMoreExchanges) {
            throw new IllegalStateException("released");
          }
          if (exchange != null) {
            throw new IllegalStateException("cannot make a new request because the previous response "
                + "is still open: please call response.close()");
          }
        }
      
        ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
        Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
      
        synchronized (connectionPool) {
          this.exchange = result;
          this.exchangeRequestDone = false;
          this.exchangeResponseDone = false;
          return result;
        }
      }
      

      然后找到最后一个CallServerInterceptor,看一下它的intercept方法:

      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Exchange exchange = realChain.exchange();
        Request request = realChain.request();
      
        long sentRequestMillis = System.currentTimeMillis();
      
        exchange.writeRequestHeaders(request);
      
        boolean responseHeadersStarted = false;
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return
          // what we did get (such as a 4xx response) without ever transmitting the request body.
          //
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            exchange.flushRequest();
            responseHeadersStarted = true;
            exchange.responseHeadersStart();
            responseBuilder = exchange.readResponseHeaders(true);
          }
      
          if (responseBuilder == null) {
            //如果为true则先把缓冲区的内容发送出去
            if (request.body().isDuplex()) {
              // Prepare a duplex body so that the application can send a request body later.
              exchange.flushRequest();
              BufferedSink bufferedRequestBody = Okio.buffer(
                  exchange.createRequestBody(request, true));
              request.body().writeTo(bufferedRequestBody);
            } else {
              // Write the request body if the "Expect: 100-continue" expectation was met.
              BufferedSink bufferedRequestBody = Okio.buffer(
                  exchange.createRequestBody(request, false));
              request.body().writeTo(bufferedRequestBody);
              bufferedRequestBody.close();
            }
          } else {
            exchange.noRequestBody();
            if (!exchange.connection().isMultiplexed()) {
              // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
              // from being reused. Otherwise we're still obligated to transmit the request body to
              // leave the connection in a consistent state.
              exchange.noNewExchangesOnConnection();
            }
          }
        } else {
          exchange.noRequestBody();
        }
      
        if (request.body() == null || !request.body().isDuplex()) {
          exchange.finishRequest();
        }
      
        if (!responseHeadersStarted) {
          exchange.responseHeadersStart();
        }
      
        if (responseBuilder == null) {
          responseBuilder = exchange.readResponseHeaders(false);
        }
      
        Response response = responseBuilder
            .request(request)
            .handshake(exchange.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
      
        int code = response.code();
        if (code == 100) {
          // server sent a 100-continue even though we did not request one.
          // try again to read the actual response
          response = exchange.readResponseHeaders(false)
              .request(request)
              .handshake(exchange.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
      
          code = response.code();
        }
      
        exchange.responseHeadersEnd(response);
      
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          response = response.newBuilder()
              .body(exchange.openResponseBody(response))
              .build();
        }
      
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          exchange.noNewExchangesOnConnection();
        }
      
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
      
        return response;
      }
      

      exchange.writeRequestHeaders(request)方法:

      public void writeRequestHeaders(Request request) throws IOException {
        try {
          eventListener.requestHeadersStart(call);
          codec.writeRequestHeaders(request);
          eventListener.requestHeadersEnd(call, request);
        } catch (IOException e) {
          eventListener.requestFailed(call, e);
          trackFailure(e);
          throw e;
        }
      }
      

      因为没有设置过eventListener,所以是默认的eventListenerFactory = EventListener.factory(EventListener.NONE),所以eventListener.requestHeadersStart(call)和eventListener.requestHeadersEnd(call, request)是空实现。newExchange方法中可知codec是由exchangeFinder.find(client, chain, doExtensiveHealthChecks)得到,exchangeFinder是在Transmitter的prepareToConnect中创建,找到exchangeFinder的find方法:

      public ExchangeCodec find(
          OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();
        int readTimeout = chain.readTimeoutMillis();
        int writeTimeout = chain.writeTimeoutMillis();
        int pingIntervalMillis = client.pingIntervalMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
      
        try {
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
          return resultConnection.newCodec(client, chain);
        } catch (RouteException e) {
          trackFailure();
          throw e;
        } catch (IOException e) {
          trackFailure();
          throw new RouteException(e);
        }
      }
      
      /**
       * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
       * until a healthy connection is found.
       */
      private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
          boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              pingIntervalMillis, connectionRetryEnabled);
      
          // If this is a brand new connection, we can skip the extensive health checks.
          synchronized (connectionPool) {
            if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
              return candidate;
            }
          }
      
          // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
          // isn't, take it out of the pool and start again.
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            candidate.noNewExchanges();
            continue;
          }
      
          return candidate;
        }
      }
      
      /**
       * Returns a connection to host a new stream. This prefers the existing connection if it exists,
       * then the pool, finally building a new connection.
       */
      private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        RealConnection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
          if (transmitter.isCanceled()) throw new IOException("Canceled");
          hasStreamFailure = false; // This is a fresh attempt.
      
          // Attempt to use an already-allocated connection. We need to be careful here because our
          // already-allocated connection may have been restricted from creating new exchanges.
          releasedConnection = transmitter.connection;
          toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
              ? transmitter.releaseConnectionNoEvents()
              : null;
      
          if (transmitter.connection != null) {
            // We had an already-allocated connection and it's good.
            result = transmitter.connection;
            releasedConnection = null;
          }
      
          if (result == null) {
            // Attempt to get a connection from the pool.
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
              foundPooledConnection = true;
              result = transmitter.connection;
            } else if (nextRouteToTry != null) {
              selectedRoute = nextRouteToTry;
              nextRouteToTry = null;
            } else if (retryCurrentRoute()) {
              selectedRoute = transmitter.connection.route();
            }
          }
        }
        closeQuietly(toClose);
      
        if (releasedConnection != null) {
          eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
        }
        if (result != null) {
          // If we found an already-allocated or pooled connection, we're done.
          return result;
        }
      
        // If we need a route selection, make one. This is a blocking operation.
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
          newRouteSelection = true;
          routeSelection = routeSelector.next();
        }
      
        List<Route> routes = null;
        synchronized (connectionPool) {
          if (transmitter.isCanceled()) throw new IOException("Canceled");
      
          if (newRouteSelection) {
            // Now that we have a set of IP addresses, make another attempt at getting a connection from
            // the pool. This could match due to connection coalescing.
            routes = routeSelection.getAll();
            if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
              foundPooledConnection = true;
              result = transmitter.connection;
            }
          }
      
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              selectedRoute = routeSelection.next();
            }
      
            // Create a connection and assign it to this allocation immediately. This makes it possible
            // for an asynchronous cancel() to interrupt the handshake we're about to do.
            result = new RealConnection(connectionPool, selectedRoute);
            connectingConnection = result;
          }
        }
      
        // If we found a pooled connection on the 2nd time around, we're done.
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
          return result;
        }
      
        // Do TCP + TLS handshakes. This is a blocking operation.
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
        connectionPool.routeDatabase.connected(result.route());
      
        Socket socket = null;
        synchronized (connectionPool) {
          connectingConnection = null;
          // Last attempt at connection coalescing, which only occurs if we attempted multiple
          // concurrent connections to the same host.
          if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
            // We lost the race! Close the connection we created and return the pooled connection.
            result.noNewExchanges = true;
            socket = result.socket();
            result = transmitter.connection;
      
            // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
            // that case we will retry the route we just successfully connected with.
            nextRouteToTry = selectedRoute;
          } else {
            connectionPool.put(result);
            transmitter.acquireConnectionNoEvents(result);
          }
        }
        closeQuietly(socket);
      
        eventListener.connectionAcquired(call, result);
        return result;
      }
      

      关键代码:

      // Do TCP + TLS handshakes. This is a blocking operation.
      result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,connectionRetryEnabled, call, eventListener);
      

      注释说明这是TCP阻塞操作,很明显这里是网络请求的地方:

      public void connect(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
          EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");
      
        RouteException routeException = null;
        List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
        ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
      
        if (route.address().sslSocketFactory() == null) {
          if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
            throw new RouteException(new UnknownServiceException(
                "CLEARTEXT communication not enabled for client"));
          }
          String host = route.address().url().host();
          if (!Platform.get().isCleartextTrafficPermitted(host)) {
            throw new RouteException(new UnknownServiceException(
                "CLEARTEXT communication to " + host + " not permitted by network security policy"));
          }
        } else {
          if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
            throw new RouteException(new UnknownServiceException(
                "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
          }
        }
      
        while (true) {
          try {
            if (route.requiresTunnel()) {
              connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
              if (rawSocket == null) {
                // We were unable to connect the tunnel but properly closed down our resources.
                break;
              }
            } else {
              connectSocket(connectTimeout, readTimeout, call, eventListener);
            }
            establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
            break;
          } catch (IOException e) {
            closeQuietly(socket);
            closeQuietly(rawSocket);
            socket = null;
            rawSocket = null;
            source = null;
            sink = null;
            handshake = null;
            protocol = null;
            http2Connection = null;
      
            eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
      
            if (routeException == null) {
              routeException = new RouteException(e);
            } else {
              routeException.addConnectException(e);
            }
      
            if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
              throw routeException;
            }
          }
        }
      
        if (route.requiresTunnel() && rawSocket == null) {
          ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
              + MAX_TUNNEL_ATTEMPTS);
          throw new RouteException(exception);
        }
      
        if (http2Connection != null) {
          synchronized (connectionPool) {
            allocationLimit = http2Connection.maxConcurrentStreams();
          }
        }
      }
      

      createTunnel里面也是先进行connectSocket:

      /**
       * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
       * the proxy connection. This may need to be retried if the proxy requires authorization.
       */
      private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
          HttpUrl url) throws IOException {
        // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
        String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
        while (true) {
          Http1ExchangeCodec tunnelCodec = new Http1ExchangeCodec(null, null, source, sink);
          source.timeout().timeout(readTimeout, MILLISECONDS);
          sink.timeout().timeout(writeTimeout, MILLISECONDS);
          tunnelCodec.writeRequest(tunnelRequest.headers(), requestLine);
          tunnelCodec.finishRequest();
          Response response = tunnelCodec.readResponseHeaders(false)
              .request(tunnelRequest)
              .build();
          tunnelCodec.skipConnectBody(response);
      
          switch (response.code()) {
            case HTTP_OK:
              // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
              // that happens, then we will have buffered bytes that are needed by the SSLSocket!
              // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
              // that it will almost certainly fail because the proxy has sent unexpected data.
              if (!source.getBuffer().exhausted() || !sink.buffer().exhausted()) {
                throw new IOException("TLS tunnel buffered too many bytes!");
              }
              return null;
      
            case HTTP_PROXY_AUTH:
              tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
              if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");
      
              if ("close".equalsIgnoreCase(response.header("Connection"))) {
                return tunnelRequest;
              }
              break;
      
            default:
              throw new IOException(
                  "Unexpected response code for CONNECT: " + response.code());
          }
        }
      }
      

      所以看一下connectSocket:

      /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
      private void connectSocket(int connectTimeout, int readTimeout, Call call,
          EventListener eventListener) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
      
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
      
        eventListener.connectStart(call, route.socketAddress(), proxy);
        rawSocket.setSoTimeout(readTimeout);
        try {
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
          ce.initCause(e);
          throw ce;
        }
      
        // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
        // More details:
        // https://github.com/square/okhttp/issues/3245
        // https://android-review.googlesource.com/#/c/271775/
        try {
          source = Okio.buffer(Okio.source(rawSocket));
          sink = Okio.buffer(Okio.sink(rawSocket));
        } catch (NullPointerException npe) {
          if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
            throw new IOException(npe);
          }
        }
      }
      

      Platform.get().connectSocket往下追溯,最终也会调用到SocketsSocketImpl的privilegedConnect中:

      private synchronized void privilegedConnect(final String host,
                                                final int port,
                                                final int timeout)
           throws IOException
      {
          try {
              AccessController.doPrivileged(
                  new java.security.PrivilegedExceptionAction<Void>() {
                      public Void run() throws IOException {
                                superConnectServer(host, port, timeout);
                                cmdIn = getInputStream();
                                cmdOut = getOutputStream();
                                return null;
                            }
                        });
          } catch (java.security.PrivilegedActionException pae) {
              throw (IOException) pae.getException();
          }
      }
      

      这又到了socket通信的地方了,此时rawSocket持有了通信连接的inputStream和outputStream,下面的Okio.buffer又把输入输出流封装了一层,以输入流的read为例:

      /**
       * Returns a source that reads from {@code socket}. Prefer this over {@link
       * #source(InputStream)} because this method honors timeouts. When the socket
       * read times out, the socket is asynchronously closed by a watchdog thread.
       */
      public static Source source(Socket socket) throws IOException {
        if (socket == null) throw new IllegalArgumentException("socket == null");
        if (socket.getInputStream() == null) throw new IOException("socket's input stream == null");
        AsyncTimeout timeout = timeout(socket);
        Source source = source(socket.getInputStream(), timeout);
        return timeout.source(source);
      }
      

      Okio的source方法,它持有InputStream:

      private static Source source(final InputStream in, final Timeout timeout) {
        if (in == null) throw new IllegalArgumentException("in == null");
        if (timeout == null) throw new IllegalArgumentException("timeout == null");
      
        return new Source() {
          @Override public long read(Buffer sink, long byteCount) throws IOException {
            if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
            if (byteCount == 0) return 0;
            try {
              timeout.throwIfReached();
              Segment tail = sink.writableSegment(1);
              int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
              int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
              if (bytesRead == -1) return -1;
              tail.limit += bytesRead;
              sink.size += bytesRead;
              return bytesRead;
            } catch (AssertionError e) {
              if (isAndroidGetsocknameError(e)) throw new IOException(e);
              throw e;
            }
          }
      
          @Override public void close() throws IOException {
            in.close();
          }
      
          @Override public Timeout timeout() {
            return timeout;
          }
      
          @Override public String toString() {
            return "source(" + in + ")";
          }
        };
      }
      

      timeout.source方法,它持有的source就是上面的Source:

      /**
       * Returns a new source that delegates to {@code source}, using this to implement timeouts. This
       * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation.
       */
      public final Source source(final Source source) {
        return new Source() {
          @Override public long read(Buffer sink, long byteCount) throws IOException {
            boolean throwOnTimeout = false;
            enter();
            try {
              long result = source.read(sink, byteCount);
              throwOnTimeout = true;
              return result;
            } catch (IOException e) {
              throw exit(e);
            } finally {
              exit(throwOnTimeout);
            }
          }
      
          @Override public void close() throws IOException {
            boolean throwOnTimeout = false;
            enter();
            try {
              source.close();
              throwOnTimeout = true;
            } catch (IOException e) {
              throw exit(e);
            } finally {
              exit(throwOnTimeout);
            }
          }
      
          @Override public Timeout timeout() {
            return AsyncTimeout.this;
          }
      
          @Override public String toString() {
            return "AsyncTimeout.source(" + source + ")";
          }
        };
      }
      

      所以最终RealConnection中的source和sink此时被赋值了。所以最后find返回的ExchangeCodec就是通过RealConnection中的newCodec得到的,并且在这个过程中取得了网络连接:

      ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
        if (http2Connection != null) {
          return new Http2ExchangeCodec(client, this, chain, http2Connection);
        } else {
          socket.setSoTimeout(chain.readTimeoutMillis());
          source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
          sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
          return new Http1ExchangeCodec(client, this, source, sink);
        }
      }
      

      可以看到newCodec就是Http1ExchangeCodec,所以codec的writeRequestHeaders为:

      @Override public void writeRequestHeaders(Request request) throws IOException {
        String requestLine = RequestLine.get(
            request, realConnection.route().proxy().type());
        writeRequest(request.headers(), requestLine);
      }
      
      /** Returns bytes of a request header for sending on an HTTP transport. */
      public void writeRequest(Headers headers, String requestLine) throws IOException {
        if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
        sink.writeUtf8(requestLine).writeUtf8("\r\n");
        for (int i = 0, size = headers.size(); i < size; i++) {
          sink.writeUtf8(headers.name(i))
              .writeUtf8(": ")
              .writeUtf8(headers.value(i))
              .writeUtf8("\r\n");
        }
        sink.writeUtf8("\r\n");
        state = STATE_OPEN_REQUEST_BODY;
      }
      

      在这里把headers信息写入Buffer。

      回到CallServerInterceptor的intercept继续往下走,如果不是GET和HEAD请求方法并且request的body不为空,则调用flushRequest方法,这个方法实际上走到底就是调用了Okio的source方法,最终调用了socket的InputStream的flush方法,即通过网络发送数据。

      关于100-Continue的头信息,是预先请求服务器是否接受POST数据。

      最后是读取response,最终返回一个RealResponseBody对象,最后通过Objects.requireNonNull(response.body()).string()读取返回的body数据。

    • 总结

      从源码层面分析了OkHttp是怎样一步步把Socket请求封装成API调用的,类似于HttpClient,不同的是,对于大部分的通用处理都以模板模式写好了,比如Interceptor设置,还有大部分的基础参数都通过默认的Builder构造了默认值。

    相关文章

      网友评论

          本文标题:OkHttp源码分析

          本文链接:https://www.haomeiwen.com/subject/aozfbktx.html