美文网首页
Okhttp 笔记

Okhttp 笔记

作者: 细雨么么 | 来源:发表于2023-02-15 18:29 被阅读0次

    android原生网络连接使用,HttpURLConnection。
    retrofit底层使用Okhttp3,OKhttp3内部采用socket连接,socket可以保持连接,复用连接减少建立链接时三次握手的时间消耗。
    okhttp最大链接数16,有三个队列来保存请求。
    RealCall.java

      @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        transmitter.callStart();
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

    Dispatcher

     /** Ready async calls in the order they'll be run. 
    发起的请求都会被先丢到这个里面排队。
    */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
     /** Running asynchronous calls. Includes canceled calls that haven't finished yet. 
      运行的异步请求。包含取消还没结束的请求。
    */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** Running synchronous calls. Includes canceled calls that haven't finished yet.
    运行的同步请求。包含取消还没结束的请求。
     */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    
     void enqueue(AsyncCall call) {
        synchronized (this) {
          readyAsyncCalls.add(call);
    
          // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
          // the same host.
          if (!call.get().forWebSocket) {
            AsyncCall existingCall = findExistingCallWithHost(call.host());
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
          }
        }
        promoteAndExecute();
      }
    

    ArrayDeque 默认构造的时候容量是16.
    **ArrayDeque **

        /**
         * Constructs an empty array deque with an initial capacity
         * sufficient to hold 16 elements.
         */
        public ArrayDeque() {
            elements = new Object[16];
        }
    
    

    Dispatcher.java

      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
    
    
    
      private boolean promoteAndExecute() {
        assert (!Thread.holdsLock(this));
    
        List<AsyncCall> executableCalls = new ArrayList<>();
        boolean isRunning;
        synchronized (this) {
          for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
    
            if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. 默认64
            if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
    
            i.remove();
            asyncCall.callsPerHost().incrementAndGet();
            executableCalls.add(asyncCall);
            runningAsyncCalls.add(asyncCall);
          }
          isRunning = runningCallsCount() > 0;
        }
    
        for (int i = 0, size = executableCalls.size(); i < size; i++) {
          AsyncCall asyncCall = executableCalls.get(i);
          asyncCall.executeOn(executorService());
        }
    
        return isRunning;
      }
    

    如果runningAsyncCalls 正在执行的线程数大于maxRequests的数量,就直接跳出循环。break
    如果保持的主机域名数大于maxRequestsPerHost,就先执行后面的,跳过当前的。 countinue。
    最后加入符合条件的集合中executableCalls。遍历执行。executeOn(executorService())最后就到了RealCall#executeOn执行自己的execute方法。通过getResponseWithInterceptorChain责任链的方式,一级一级拼接请求以及回调获得response。

    默认配置添加的责任链。

     interceptors.add(new RetryAndFollowUpInterceptor(client));
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
    

    RetryAndFollowUpInterceptor

     @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Transmitter transmitter = realChain.transmitter();
    
        int followUpCount = 0;
        Response priorResponse = null;
        while (true) {
          transmitter.prepareToConnect(request);//如果与上次的主机地址端口等一致,直接返回啦。
    
          if (transmitter.isCanceled()) {
            throw new IOException("Canceled");
          }
    
          Response response;
          boolean success = false;
          try {
            response = realChain.proceed(request, transmitter, null);//通过责任链管理员跳到下一个处理链上
            success = true;
          } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            if (!recover(e.getLastConnectException(), transmitter, false, request)) {
              throw e.getFirstConnectException();
            }
            continue;
          } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, transmitter, requestSendStarted, request)) throw e;
            continue;
          } finally {
            // The network call threw an exception. Release any resources.
            if (!success) {
              transmitter.exchangeDoneDueToException();
            }
          }
    
          // Attach the prior response if it exists. Such responses never have a body.
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
    
          Exchange exchange = Internal.instance.exchange(response);
          Route route = exchange != null ? exchange.connection().route() : null;
          Request followUp = followUpRequest(response, route);
    
          if (followUp == null) {
            if (exchange != null && exchange.isDuplex()) {
              transmitter.timeoutEarlyExit();
            }
            return response;
          }
    
          RequestBody followUpBody = followUp.body();
          if (followUpBody != null && followUpBody.isOneShot()) {
            return response;
          }
    
          closeQuietly(response.body());
          if (transmitter.hasExchange()) {
            exchange.detachWithViolence();
          }
    
          if (++followUpCount > MAX_FOLLOW_UPS) {
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
    
          request = followUp;
          priorResponse = response;
        }
      }
    
    

    BridgeInterceptor一顿乱加头部信息

      @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
      //以上疯狂加入头部信息
        Response networkResponse = chain.proceed(requestBuilder.build());//跳转下一个责任链
    
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
    
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    
    

    就这么一直下一个下一个,来到了ConnectInterceptor
    ConnectInterceptor

    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        Transmitter transmitter = realChain.transmitter();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    
        return realChain.proceed(request, transmitter, exchange);
      }
    

    代码特别少,transmitter.newExchange-->Transmitter#newExchange-->ExchangeFinder#find--->ExchangeFinder#findHealthyConnection-->ExchangeFinder#findConnection-->

    
        // If we found a pooled connection on the 2nd time around, we're done.
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
          return result;
        }
    
        // Do TCP + TLS handshakes. This is a blocking operation.
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
        connectionPool.routeDatabase.connected(result.route());
    

    如果找到了直接返回,没招到就去connect一个。
    RealConnection#connect --->connectSocket看到

      rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
    

    一连串眼花缭乱的操作,来到最后一个责任链 CallServerInterceptor。

    Exchange exchange = realChain.exchange();
    又是一顿解析什么的。通过exchange来做发送。拼接返回的response。

    相关文章

      网友评论

          本文标题:Okhttp 笔记

          本文链接:https://www.haomeiwen.com/subject/vxfukdtx.html