美文网首页
学习笔记OkHttp

学习笔记OkHttp

作者: 回眸婉约 | 来源:发表于2021-01-21 16:50 被阅读0次

    学习一下OkHttp原理,探究他发送请求的过程
    合理的跳过了一些内容,比如DNS、Cookie、Protocol、okio
    OkHttp版本3.14.9

    btn.setOnClickListener(v -> {
        new Thread(() -> {
            OkHttpClient client = new OkHttpClient.Builder().build();
            Request request = new Request.Builder()
                    .url("这里是一个能访问的url")  
                    .build();
            Call call = client.newCall(request);
            try{
                //同步请求
                Response response = call.execute();
                System.out.println(response.body().string());
            }catch(IOException e){
                e.printStackTrace();
            }
        }).start();
    
    
        OkHttpClient client = new OkHttpClient.Builder().build();
                    Request request = new Request.Builder()
                            .url("这里是一个能访问的url")
                            .build();
                    Call call = client.newCall(request);
    
                    call.enqueue(new Callback() {
                            @Override
                            public void onFailure(Call call, IOException e) {
    
                            }
    
                            @Override
                            public void onResponse(Call call, Response response) throws IOException {
                                System.out.println(response.body().string());
                            }
                        });
    
    
    
    });
    

    随便写了一个按钮,定义 OkHttpClientRequestCall, 然后同步请求,写了一个最少的http代码请求,返回 Response 输出body
    2块内容一个是同步请求,一个是异步请求,同步请求不加线程会抛出异常

    开始
    OkHttpClient.Builder()

    看到 OkHttpClient.Builder() 我已经不想开了,肯定是初始各种值
    build()各种赋值,然后返回一个 OkHttpClient 对象

    Builder(OkHttpClient okHttpClient) {
      this.dispatcher = okHttpClient.dispatcher;//调度器
      //..
      this.connectionPool = okHttpClient.connectionPool;
      //..
      this.callTimeout = okHttpClient.callTimeout;
      this.connectTimeout = okHttpClient.connectTimeout;
      this.readTimeout = okHttpClient.readTimeout;
      this.writeTimeout = okHttpClient.writeTimeout;
      this.pingInterval = okHttpClient.pingInterval;
    }
    

    只留关键的,看名字大概都能知道是什么了
    调度器,连接池,call超时,连接超时等等,其他的基本也能根据名字看出来是干什么的

    Request.Builder()
    public static class Builder {
      @Nullable HttpUrl url;
      String method;
      Headers.Builder headers;
      @Nullable RequestBody body;
    
      Map<Class<?>, Object> tags = Collections.emptyMap();
    
      public Builder() {
        this.method = "GET";
        this.headers = new Headers.Builder();
      }
    }
    

    一个请求包含urlmethodheadersbody,默认GET

    client.newCall(request)
    @Override public Call newCall(Request request) {
      return RealCall.newRealCall(this, request, false);
    }
    
    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
      RealCall call = new RealCall(client, originalRequest, forWebSocket);
      call.transmitter = new Transmitter(client, call);
      return call;
    }
    

    根据 newCall() 返回了一个 RealCall 对象,还new了一个Transmitter发射器,对象交给了call
    看一下RealCall和Transmitter构造函数

    private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
      this.client = client;
      this.originalRequest = originalRequest;
      this.forWebSocket = forWebSocket;
    }
    public Transmitter(OkHttpClient client, Call call) {
      this.client = client;
      this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
      this.call = call;
      this.eventListener = client.eventListenerFactory().create(call);
      this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
    }
    

    也是简单的初始化,不过能看出这个RealCall已经有了发射器,并且里面有连接池了

    call.execute() 同步请求

    关键部分,先看call.execute(),同步执行,根据安卓机制必须在子线程

    @Override public Response execute() throws IOException {
      //保证只有一个执行
      synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
      }
      transmitter.timeoutEnter();
      transmitter.callStart();
      try {
        client.dispatcher().executed(this);
        return getResponseWithInterceptorChain();
      } finally {
        client.dispatcher().finished(this);
      }
    }
    
    //dispatcher的executed, 这里把RealCall放到了同步队列里
    synchronized void executed(RealCall call) {
      runningSyncCalls.add(call);
    }
    

    getResponseWithInterceptorChain() 是库的最关键代码,直接贴代码
    interceptor 是拦截器的意思

    Response getResponseWithInterceptorChain() throws IOException {
      //创建了一个拦截器List,然后添加了写拦截器对象
      List<Interceptor> interceptors = new ArrayList<>();
      //这是自定义的 先不管
      interceptors.addAll(client.interceptors());
      //重定向拦截器
      interceptors.add(new RetryAndFollowUpInterceptor(client));
      //桥拦截器
      interceptors.add(new BridgeInterceptor(client.cookieJar()));
      //缓存拦截器
      interceptors.add(new CacheInterceptor(client.internalCache()));
      //连接拦截器
      interceptors.add(new ConnectInterceptor(client));
      if (!forWebSocket) {
        interceptors.addAll(client.networkInterceptors());
      }
      //呼叫服务拦截器。。。
      interceptors.add(new CallServerInterceptor(forWebSocket));
    
      //正真串联拦截器的地方
      Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
          originalRequest, this, client.connectTimeoutMillis(),
          client.readTimeoutMillis(), client.writeTimeoutMillis());
    
      boolean calledNoMoreExchanges = false;
      try {
        //这里是正真的把拦截器串联起来的地方 ,并返回Response对象
        Response response = chain.proceed(originalRequest);
        if (transmitter.isCanceled()) {
          closeQuietly(response);
          throw new IOException("Canceled");
        }
        return response;
      } catch (IOException e) {
        calledNoMoreExchanges = true;
        throw transmitter.noMoreExchanges(e);
      } finally {
        if (!calledNoMoreExchanges) {
          transmitter.noMoreExchanges(null);
        }
      }
    }
    

    看看是怎么连起来的
    chain.proceed(originalRequest) 执行拦截器方法

    public final class RealInterceptorChain implements Interceptor.Chain {
      @Override public Response proceed(Request request) throws IOException {
        return proceed(request, transmitter, exchange);
      }
    
      public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)throws IOException {
    
        //保留重要代码
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
    
        return response;
      }
    }
    

    可以看到 RealInterceptorChain 类实现了 Interceptor.Chain 接口,完成了 proceed(Request request) 方法
    在下面的 proceed(...) 中,又new了一个 RealInterceptorChain 把拦截器List等参数又传了进去,有个关键的变量 index
    也就是说 自己new了自己,并告诉new出来的自己索引+1
    在看下面2行获取了一个拦截器,执行他的拦截方法interceptor.intercept(next)
    这里构造了一个 责任链模式,简单理解下责任链模式,有点链表结构的感觉,但不是链表,相当于把一个节点一个节点的连起来,当输出第一个节点,会依次调用下面一个节点输出内容
    按着顺序来取第一个拦截器

    1、RetryAndFollowUpInterceptor 重定向拦截器

    重定向拦截器:主要负责失败重连,但并不是所有失败都重连

    public final class RetryAndFollowUpInterceptor implements Interceptor {
      @Override public Response intercept(Chain chain) throws IOException {
          Request request = chain.request();
          RealInterceptorChain realChain = (RealInterceptorChain) chain;
          Transmitter transmitter = realChain.transmitter();
    
          int followUpCount = 0;
          Response priorResponse = null;
          while (true) {
            transmitter.prepareToConnect(request);
            //如果取消就抛异常
            if (transmitter.isCanceled()) {
              throw new IOException("Canceled");
            }
    
            Response response;
            boolean success = false;
            try {
              //使用realChain执行下一个拦截器
              //为什么要直接执行下一个拦截器?
              //因为这个是为重定向做的拦截器,肯定是等请求返回结果之后再决定是否要重定向
              response = realChain.proceed(request, transmitter, null);
              success = true;
            } catch (RouteException e) {
              // The attempt to connect via a route failed. The request will not have been sent.
              if (!recover(e.getLastConnectException(), transmitter, false, request)) {
                throw e.getFirstConnectException();
              }
              continue;
            }//这里删了catch,出了异常,catch肯定是关闭流
    
            //如果priorResponse不为null,又newBuilder()
            //把上一个Response和现在的结合在一起
            //正常来说第一次肯定是空的
            //prior 先前的,先前的Response不为null
            if (priorResponse != null) {
              response = response.newBuilder()
                  .priorResponse(priorResponse.newBuilder()
                          .body(null)
                          .build())
                  .build();
            }
    
            Exchange exchange = Internal.instance.exchange(response);
            Route route = exchange != null ? exchange.connection().route() : null;
            //检查是否符合要求
            //这个方法里是一个switch、case去找HttpCode状态码 比如200,404
            Request followUp = followUpRequest(response, route);
    
            //正常来说应该是200 followUpRequest()就会返回null
            if (followUp == null) {
              if (exchange != null && exchange.isDuplex()) {
                transmitter.timeoutEarlyExit();
              }
              return response;
            }
    
            RequestBody followUpBody = followUp.body();
            if (followUpBody != null && followUpBody.isOneShot()) {
              return response;
            }
    
            closeQuietly(response.body());
            if (transmitter.hasExchange()) {
              exchange.detachWithViolence();
            }
            //限制重定向次数
            if (++followUpCount > MAX_FOLLOW_UPS) {
              throw new ProtocolException("Too many follow-up requests: " + followUpCount);
            }
    
            request = followUp;
            priorResponse = response;
          }
        }
    }
    

    这个拦截器就是先请求,请求返回数据之后根据数据来判断,是否需要重定向

    2、BridgeInterceptor 桥拦截器

    这里很长,桥拦截器主要用来添加头部信息、编码方式

    public final class BridgeInterceptor implements Interceptor {
      @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
    
        //这里调用下一层的拦截器
        Response networkResponse = chain.proceed(requestBuilder.build());
        //下面是处理cookie缓存和GZip的内容
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
    
        //判断服务器是否支持GZip,支持交给okio处理
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          //处理之后build 一个新的Response
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    }
    

    这里就是3段内容,增加头信息、调用下一层拦截器、处理缓存和GZip

    3、CacheInterceptor 缓存拦截器

    缓存拦截器,顾名思义处理缓存

    public final class CacheInterceptor implements Interceptor {
      @Override public Response intercept(Chain chain) throws IOException {
         Response cacheCandidate = cache != null
             ? cache.get(chain.request())
             : null;
    
         long now = System.currentTimeMillis();
    
         CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
         Request networkRequest = strategy.networkRequest;
         Response cacheResponse = strategy.cacheResponse;
    
         if (cache != null) {
           cache.trackResponse(strategy);
         }
    
         if (cacheCandidate != null && cacheResponse == null) {
           closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
         }
    
         //注释写明了,如果没网和没缓存 直接返回失败
         // If we're forbidden from using the network and the cache is insufficient, fail.
         if (networkRequest == null && cacheResponse == null) {
           return new Response.Builder()
               .request(chain.request())
               .protocol(Protocol.HTTP_1_1)
               .code(504)
               .message("Unsatisfiable Request (only-if-cached)")
               .body(Util.EMPTY_RESPONSE)
               .sentRequestAtMillis(-1L)
               .receivedResponseAtMillis(System.currentTimeMillis())
               .build();
         }
         //这里没网
         // If we don't need the network, we're done.
         if (networkRequest == null) {
           return cacheResponse.newBuilder()
               .cacheResponse(stripBody(cacheResponse))
               .build();
         }
    
         Response networkResponse = null;
         try {
           //调用下一层拦截器
           networkResponse = chain.proceed(networkRequest);
         } finally {
           // If we're crashing on I/O or otherwise, don't leak the cache body.
           if (networkResponse == null && cacheCandidate != null) {
             closeQuietly(cacheCandidate.body());
           }
         }
    
         // If we have a cache response too, then we're doing a conditional get.
         //如果有缓存并且code为304,使用缓存内容
         if (cacheResponse != null) {
           if (networkResponse.code() == HTTP_NOT_MODIFIED) {
             Response response = cacheResponse.newBuilder()
                 .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                 .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                 .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                 .cacheResponse(stripBody(cacheResponse))
                 .networkResponse(stripBody(networkResponse))
                 .build();
             networkResponse.body().close();
    
             // Update the cache after combining headers but before stripping the
             // Content-Encoding header (as performed by initContentStream()).
             cache.trackConditionalCacheHit();
             cache.update(cacheResponse, response);
             return response;
           } else {
             closeQuietly(cacheResponse.body());
           }
         }
    
         Response response = networkResponse.newBuilder()
             .cacheResponse(stripBody(cacheResponse))
             .networkResponse(stripBody(networkResponse))
             .build();
    
         if (cache != null) {
           if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
             // Offer this request to the cache.
             CacheRequest cacheRequest = cache.put(response);
             return cacheWritingResponse(cacheRequest, response);
           }
    
           if (HttpMethod.invalidatesCache(networkRequest.method())) {
             try {
               cache.remove(networkRequest);
             } catch (IOException ignored) {
               // The cache cannot be written.
             }
           }
         }
    
         return response;
       }
    }
    

    这一层也很明确,专门处理缓存,有个关键cache对象,如果最开始okhttp初始化,赋值了,则会使用这个缓存,如果没赋值则不会使用。

    4、ConnectInterceptor 连接拦截器

    连接拦截器,这层是关键内容,也最复杂,负责与服务器建立连接

    public final class ConnectInterceptor implements Interceptor {
      public final OkHttpClient client;
    
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        Transmitter transmitter = realChain.transmitter();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    
        return realChain.proceed(request, transmitter, exchange);
      }
    }
    //前面只是获取对象
    //关键 发射机 transmitter.newExchange(chain, doExtensiveHealthChecks)
    
    public final class Transmitter {
      //这个exchangeFinder也是new出来的
      private ExchangeFinder exchangeFinder;
    
      //这里看方法是返回一个Exchange对象
      //我也不知道为什么要叫这个名字,历史版本我也没看过
      Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        synchronized (connectionPool) {
          if (noMoreExchanges) {
            throw new IllegalStateException("released");
          }
          if (exchange != null) {
            throw new IllegalStateException("cannot make a new request because the previous response "
                + "is still open: please call response.close()");
          }
        }
        //exchangeFinder,exchange查找器
        ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
        Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
    
        synchronized (connectionPool) {
          this.exchange = result;
          this.exchangeRequestDone = false;
          this.exchangeResponseDone = false;
          return result;
        }
      }
    }
    //Exchange查找器
    final class ExchangeFinder {
    
      public ExchangeCodec find(
            OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
          int connectTimeout = chain.connectTimeoutMillis();
          int readTimeout = chain.readTimeoutMillis();
          int writeTimeout = chain.writeTimeoutMillis();
          int pingIntervalMillis = client.pingIntervalMillis();
          boolean connectionRetryEnabled = client.retryOnConnectionFailure();
          //上面只是赋值 连接时间 读取时间超时等等
          try {
            //获取一个健康的连接
            RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
            return resultConnection.newCodec(client, chain);
          } catch (RouteException e) {
            trackFailure();
            throw e;
          } catch (IOException e) {
            trackFailure();
            throw new RouteException(e);
          }
        }
        //寻找一个健康的连接
        private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,boolean doExtensiveHealthChecks) throws IOException {
          while (true) {
            //while ture 去寻找一个 连接
            //这里死循环去找链接的连接 可能是有问题的连接
            RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
                pingIntervalMillis, connectionRetryEnabled);
    
            synchronized (connectionPool) {
              //这里锁 一下 判断 ,如果是全新的连接就直接返回
              if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
                return candidate;
              }
            }
    
            //这里判断连接是否健康,无非是socket没有关闭,等等
            if (!candidate.isHealthy(doExtensiveHealthChecks)) {
              candidate.noNewExchanges();
              continue;
            }
    
            return candidate;
          }
        }
    
      //最长代码,一点点看
    vate RealConnection findConnection(int connectTimeout, int  dTimeout, int writeTimeout,
    nt pingIntervalMillis, boolean connectionRetryEnabled) throws OException {
      boolean foundPooledConnection = false;
      RealConnection result = null;
      Route selectedRoute = null;
      RealConnection releasedConnection;
      Socket toClose;
      synchronized (connectionPool) {
        if (transmitter.isCanceled()) throw new IOException("Canceled");
        hasStreamFailure = false; // This is a fresh attempt.
    
        //这里锁池子,看看当前发射机transmitter里面的连接能不能用
        releasedConnection = transmitter.connection;
        toClose = transmitter.connection != null &&   transmitter.connection.noNewExchanges
        ? transmitter.releaseConnectionNoEvents()
        : null;
    
        if (transmitter.connection != null) {
          //这里判断有点特别
          //发射机连接不为空 能用 就赋值给result对象
          result = transmitter.connection;
          releasedConnection = null;
        }
        //当然result为空就要从连接池取
        if (result == null) {
          //里面就是循环连接池,循环取,当然 也有可能取不到
          //取不到下面会new的,第一次肯定取不到
          if(connectionPool.transmitterAcquirePooledConnection(address,  transmitter, null, false)) {
            foundPooledConnection = true;
            result = transmitter.connection;
            } else if (nextRouteToTry != null) {
              selectedRoute = nextRouteToTry;
              nextRouteToTry = null;
              } else if (retryCurrentRoute()) {
                selectedRoute = transmitter.connection.route();
              }
            }
          }
          closeQuietly(toClose);
    
          if (releasedConnection != null) {
            eventListener.connectionReleased(call, releasedConnection);
          }
          if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
          }
          if (result != null) {
            //到这里result不为空  则直接返回发射机里的 连接
            return result;
          }
    
          // If we need a route selection, make one. This is a blocking operation.
          boolean newRouteSelection = false;
          if (selectedRoute == null && (routeSelection == null ||   !routeSelection.hasNext())) {
            newRouteSelection = true;
            routeSelection = routeSelector.next();
          }
    
          List<Route> routes = null;
          synchronized (connectionPool) {
            if (transmitter.isCanceled()) throw new IOException("Canceled");
    
            if (newRouteSelection) {
              //获取路由表,再次尝试从连接池里取连接
              routes = routeSelection.getAll();
              if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
                  foundPooledConnection = true;
                  //取到了就赋值给result
                  result = transmitter.connection;
                }
              }
    
              if (!foundPooledConnection) {
                if (selectedRoute == null) {
                  selectedRoute = routeSelection.next();
                }
    
                //各种都取不到 new了
                result = new RealConnection(connectionPool, selectedRoute);
                connectingConnection = result;
              }
            }
    
            //这里是回调 ,获取连接后调用的
            if (foundPooledConnection) {
              eventListener.connectionAcquired(call, result);
              return result;
            }
    
            //到这里肯定是新连接了,开始握手
            result.connect(connectTimeout, readTimeout, writeTimeout,   pingIntervalMillis,
              connectionRetryEnabled, call, eventListener);
              connectionPool.routeDatabase.connected(result.route());
    
            Socket socket = null;
            synchronized (connectionPool) {
              connectingConnection = null;
              //这里又尝试去取连接
              if(connectionPool.transmitterAcquirePooledConnection(address,  transmitter, routes, true)) {
                  result.noNewExchanges = true;
                  socket = result.socket();
                  result = transmitter.connection;
    
                  // It's possible for us to obtain a coalesced connection that is  immediately unhealthy. In
                  // that case we will retry the route we just successfully connected   with.
                  nextRouteToTry = selectedRoute;
                  } else {
                    connectionPool.put(result);
                    transmitter.acquireConnectionNoEvents(result);
                  }
                }
                closeQuietly(socket);
    
                eventListener.connectionAcquired(call, result);
                return result;
              }
        //到这里一个连接就找到了
        //返回上面 ,根据这个连接去判断他是否健康,不健康就再取
    }
    

    1、从transmitter中取,取到就返回
    2、如果取不到从ConnectionPool中取,取到就返回
    3、遍历路由地址、再从ConnectionPool中取
    4、创建新的Connection
    5、用新的Connection进行握手
    到这里 从find()->findHealthyConnection()->findConnection()很清晰
    这里我有个不明白的地方,如果大佬看到了可以为我解释:可能这里偏向网络了,不太明白这里的路由表Route有什么作用。
    再来看一下 握手

    result.connect(...) 握手
    public void connect(int connectTimeout, int readTimeout, int writeTimeout,int pingIntervalMillis, boolean connectionRetryEnabled, Call call,EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");
    
        //去掉一些赋值和一些判断,如果有问题会抛出路由异常
        //判断客户端是否启用明文通信等等
    
        while (true) {
          try {
            if (route.requiresTunnel()) {
              //如果有通道,去进行连接通道 里面还是会连接socket
              connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
              if (rawSocket == null) {
                //成功就跳出
                break;
              }
            } else {
              //关键 连接socket
              connectSocket(connectTimeout, readTimeout, call, eventListener);
            }
            //定制协议
            //这里面赋值一下协议,这时候socket已经建立了
            //已经知道服务器用的什么协议了
            establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
            //listener 回调 告诉我们 连接完成
            //这里在Okhttp创建的时候 做一些日志操作
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
            break;
          } catch (IOException e) {
            //一些关闭流的操作
         }
    
        if (route.requiresTunnel() && rawSocket == null) {
          ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
              + MAX_TUNNEL_ATTEMPTS);
          throw new RouteException(exception);
        }
    
        if (http2Connection != null) {
          synchronized (connectionPool) {
            allocationLimit = http2Connection.maxConcurrentStreams();
          }
        }
    }
    
    //socket连接
    connectSocket(connectTimeout, readTimeout, call, eventListener);
    private void connectSocket(int connectTimeout, int readTimeout, Call call,
        EventListener eventListener) throws IOException {
      Proxy proxy = route.proxy();
      Address address = route.address();
    
      rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
          ? address.socketFactory().createSocket()
          : new Socket(proxy);
      //连接开始
      eventListener.connectStart(call, route.socketAddress(), proxy);
      rawSocket.setSoTimeout(readTimeout);
      try {
        //连接操作
        //get判断是android平台还是java平台,再里面反射了一些SSL的底层实现类
        //connectSocket里面就是socket连接了
        Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
      } catch (ConnectException e) {
        ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
        ce.initCause(e);
        throw ce;
    }
    

    ConnectInterceptor 连接拦截器,这里很长,做的事情也很明确
    1、从连接池取连接
    2、如果是新的连接,socket建立连接
    其实这里的握手操作意思是建立Socket操作,Socket是一个套接字,是一个通信连接的对象,socket属于传输层,正真建立三次握手的是在这一层,不是在我们写的引用层。

    5、CallServerInterceptor 呼叫服务器拦截器

    最后一层,请求网络

    public final class CallServerInterceptor implements Interceptor {
    
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Exchange exchange = realChain.exchange();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
        //向socket头写入信息
        exchange.writeRequestHeaders(request);
    
        boolean responseHeadersStarted = false;
        Response.Builder responseBuilder = null;
        //这里判断是否有Body
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          //当socket连接服务器后,就已经有状态码了
          //100 该状态码 说明服务器已经接收到了 请求的初始部分
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            exchange.flushRequest();
            responseHeadersStarted = true;
            exchange.responseHeadersStart();
            responseBuilder = exchange.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            //这里判断全双工  默认false
            if (request.body().isDuplex()) {
              // Prepare a duplex body so that the application can send a request body later.
              exchange.flushRequest();
              BufferedSink bufferedRequestBody = Okio.buffer(
                  exchange.createRequestBody(request, true));
              request.body().writeTo(bufferedRequestBody);
            } else {
              // Write the request body if the "Expect: 100-continue" expectation was met.
              BufferedSink bufferedRequestBody = Okio.buffer(
                  exchange.createRequestBody(request, false));
              request.body().writeTo(bufferedRequestBody);
              //
              bufferedRequestBody.close();
            }
          } else {
            exchange.noRequestBody();
            if (!exchange.connection().isMultiplexed()) {
              // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
              // from being reused. Otherwise we're still obligated to transmit the request body to
              // leave the connection in a consistent state.
              exchange.noNewExchangesOnConnection();
            }
          }
        } else {
          exchange.noRequestBody();
        }
    
        if (request.body() == null || !request.body().isDuplex()) {
          //完成网络的写入
          exchange.finishRequest();
        }
    
        if (!responseHeadersStarted) {
          //回调listener
          exchange.responseHeadersStart();
        }
    
        if (responseBuilder == null) {
          //读取Response  在这个里面正真的 调用他自己的okio 读取数据
          responseBuilder = exchange.readResponseHeaders(false);
        }
        //数据已经回来了到这里 后面就是一些小判断了
        Response response = responseBuilder
            .request(request)
            .handshake(exchange.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (code == 100) {
          // server sent a 100-continue even though we did not request one.
          // try again to read the actual response
          response = exchange.readResponseHeaders(false)
              .request(request)
              .handshake(exchange.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
    
          code = response.code();
        }
    
        exchange.responseHeadersEnd(response);
    
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          response = response.newBuilder()
              .body(exchange.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          exchange.noNewExchangesOnConnection();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
      }
    }
    

    到这里 Response已经回来,数据也读取完毕。最后返回 response
    这里5个拦截器已经全部走完
    回顾一下5个拦截器
    1、RetryAndFollowUpInterceptor 重定向拦截器
    2、BridgeInterceptor 桥拦截器
    3、CacheInterceptor 缓存拦截器
    4、ConnectInterceptor 连接拦截器
    5、CallServerInterceptor 呼叫服务拦截器
    这5个拦截器依次从上往下执行,最后再返回
    以上记录的是 同步请求方法

    call.enqueue(new Callback(){..} ) 异步请求
    @Override public void enqueue(Callback responseCallback) {
      synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
      }
      transmitter.callStart();
      //创建了一个AsynCall对象
      client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }
    //调度器的dispatcher方法
    void enqueue(AsyncCall call) {
      synchronized (this) {
        //锁住对象放入 readyAsyncCalls 队列中
        readyAsyncCalls.add(call);
    
        if (!call.get().forWebSocket) {
          AsyncCall existingCall = findExistingCallWithHost(call.host());
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
      }
      //执行
      promoteAndExecute();
    }
    
    //执行
    private boolean promoteAndExecute() {
    
      List<AsyncCall> executableCalls = new ArrayList<>();
      boolean isRunning;
      synchronized (this) {
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall asyncCall = i.next();
    
          if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
          if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
    
          i.remove();
          asyncCall.callsPerHost().incrementAndGet();
          //锁住对象时的 临时队列
          executableCalls.add(asyncCall);
          //锁住对象,把准备好的AsyncCall 放到runningAsyncCalls队列里
          //runningAsyncCalls是正在执行的队列
          runningAsyncCalls.add(asyncCall);
        }
        isRunning = runningCallsCount() > 0;
      }
    
      for (int i = 0, size = executableCalls.size(); i < size; i++) {
        AsyncCall asyncCall = executableCalls.get(i);
        //循环这个执行临时队列的
        //根据安卓的机制还是需要在子线程里请求网络
        //executorService()是个线程池
        asyncCall.executeOn(executorService());
      }
    
      return isRunning;
    }
    //看一下AsyncCall结构 是个内部类
    final class RealCall implements Call {
      final class AsyncCall extends NamedRunnable {
      }
    }
    //在看一下NamedRunnable,继承了Runnable,他的run方法里执行了execute()
    public abstract class NamedRunnable implements Runnable {
      protected final String name;
    
      public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
      }
    
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
    //既然执行了execute()方法 打开asyncCall.executeOn(executorService());
    
    void executeOn(ExecutorService executorService) {
    
          boolean success = false;
          try {
            //executorService线程池对象 执行this
            //这个this就是AsyncCall类
            //也就是线程池执行 run方法,上面的run方法又执行了execute()
    
            executorService.execute(this);
            success = true;
          } catch (RejectedExecutionException e) {
    
          } finally {
    
          }
    }
    //所以只要看  AsyncCall 完成的 NamedRunnable的 execute()方法
    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
    
      } finally {
    
      }
    }
    

    最后又执行了 getResponseWithInterceptorChain() 方法,执行5个拦截器,返回
    这里代码很长,关键内容就在5个拦截器
    OkHttp还有一个比较关键的内容就是连接池,暂时不写了。
    贴个图加深印象,看源码不太清晰的时候,画个思维导图或者画一个重要类的 类图基本上代码就很清晰了

    OkHttp.png
    我也看到了很多不会内容,我会继续深入的学习
    如果你看到这里,觉得写的还可以,请给我赞吧~!😀

    学艺不精,如果内容有错误请及时联系我,我及时改正

    相关文章

      网友评论

          本文标题:学习笔记OkHttp

          本文链接:https://www.haomeiwen.com/subject/tieunktx.html