美文网首页
OKHttp源码分析与手写实现

OKHttp源码分析与手写实现

作者: Coder_Sven | 来源:发表于2020-06-09 20:02 被阅读0次

    1,使用

    implementation 'com.squareup.okhttp3:okhttp:3.14.2'
    

    1.1异步GET请求

    String url = "http://wwww.baidu.com";
    OkHttpClient okHttpClient = new OkHttpClient();
    final Request request = new Request.Builder()
            .url(url)
            .get()//默认就是GET请求,可以不写
            .build();
    Call call = okHttpClient.newCall(request);
    call.enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.d(TAG, "onFailure: ");
        }
    
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.d(TAG, "onResponse: " + response.body().string());
        }
    });
    

    1.2同步GET请求

    String url = "http://wwww.baidu.com";
    OkHttpClient okHttpClient = new OkHttpClient();
    final Request request = new Request.Builder()
            .url(url)
            .build();
    final Call call = okHttpClient.newCall(request);
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Response response = call.execute();
                Log.d(TAG, "run: " + response.body().string());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }).start();
    

    2,分析源码

    public OkHttpClient() {
      this(new Builder());
    }
    
      OkHttpClient(Builder builder) {
        this.dispatcher = builder.dispatcher;
        this.proxy = builder.proxy;
        this.protocols = builder.protocols;
        this.connectionSpecs = builder.connectionSpecs;
        this.interceptors = Util.immutableList(builder.interceptors);
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
        this.eventListenerFactory = builder.eventListenerFactory;
        this.proxySelector = builder.proxySelector;
        this.cookieJar = builder.cookieJar;
        this.cache = builder.cache;
        this.internalCache = builder.internalCache;
        this.socketFactory = builder.socketFactory;
    
        boolean isTLS = false;
        for (ConnectionSpec spec : connectionSpecs) {
          isTLS = isTLS || spec.isTls();
        }
    
        if (builder.sslSocketFactory != null || !isTLS) {
          this.sslSocketFactory = builder.sslSocketFactory;
          this.certificateChainCleaner = builder.certificateChainCleaner;
        } else {
          X509TrustManager trustManager = Util.platformTrustManager();
          this.sslSocketFactory = newSslSocketFactory(trustManager);
          this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
        }
    
        if (sslSocketFactory != null) {
          Platform.get().configureSslSocketFactory(sslSocketFactory);
        }
    
        this.hostnameVerifier = builder.hostnameVerifier;
        this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
            certificateChainCleaner);
        this.proxyAuthenticator = builder.proxyAuthenticator;
        this.authenticator = builder.authenticator;
        this.connectionPool = builder.connectionPool;
        this.dns = builder.dns;
        this.followSslRedirects = builder.followSslRedirects;
        this.followRedirects = builder.followRedirects;
        this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
        this.callTimeout = builder.callTimeout;
        this.connectTimeout = builder.connectTimeout;
        this.readTimeout = builder.readTimeout;
        this.writeTimeout = builder.writeTimeout;
        this.pingInterval = builder.pingInterval;
    
        if (interceptors.contains(null)) {
          throw new IllegalStateException("Null interceptor: " + interceptors);
        }
        if (networkInterceptors.contains(null)) {
          throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
        }
      }
    

    new Request.Builder().url(url).build()

    [---------->Request.java]

    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }
    
    ...
    public Request build() {
        if (url == null) throw new IllegalStateException("url == null");
        return new Request(this);
    }
    

    okHttpClient.newCall

    [------->OkHttpClient.java]

    @Override public Call newCall(Request request) {
      return RealCall.newRealCall(this, request, false /* for web socket */);
    }
    
    
    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
      // Safely publish the Call instance to the EventListener.
      RealCall call = new RealCall(client, originalRequest, forWebSocket);
      call.transmitter = new Transmitter(client, call);
      return call;
    }
    

    call.enqueue

    [--------->RealCall.java]

    @Override public void enqueue(Callback responseCallback) {
      synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
      }
      transmitter.callStart();
      client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }
    

    client.dispatcher()

    [---------->Dispatcher.java]

    void enqueue(AsyncCall call) {
      synchronized (this) {
        readyAsyncCalls.add(call);
    
        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.get().forWebSocket) {
          AsyncCall existingCall = findExistingCallWithHost(call.host());
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
      }
      promoteAndExecute();
    }
    
    private boolean promoteAndExecute() {
      assert (!Thread.holdsLock(this));
    
      List<AsyncCall> executableCalls = new ArrayList<>();
      boolean isRunning;
      synchronized (this) {
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall asyncCall = i.next();
    
          if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
          if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
    
          i.remove();
          asyncCall.callsPerHost().incrementAndGet();
          executableCalls.add(asyncCall);
          runningAsyncCalls.add(asyncCall);
        }
        isRunning = runningCallsCount() > 0;
      }
    
      for (int i = 0, size = executableCalls.size(); i < size; i++) {
        AsyncCall asyncCall = executableCalls.get(i);
        asyncCall.executeOn(executorService());
      }
    
      return isRunning;
    }
    

    executorService()

    public synchronized ExecutorService executorService() {
      if (executorService == null) {
      //开线程池
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
            new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
      }
      return executorService;
    }
    

    asyncCall.executeOn

    [------->RealCall#AsyncCall.java]

    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }
    

    executorService.execute(this)

    [---------->NamedRunnable.java]

    @Override public final void run() {
      String oldName = Thread.currentThread().getName();
      Thread.currentThread().setName(name);
      try {
        execute();
      } finally {
        Thread.currentThread().setName(oldName);
      }
    }
    

    execute()

    [------->RealCall#AsyncCall.java]

    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
    
        Response getResponseWithInterceptorChain() throws IOException {
            // Build a full stack of interceptors.
            List<Interceptor> interceptors = new ArrayList<>();
            //①用户添加的全局拦截器
            interceptors.addAll(client.interceptors());
            //错误,重定向拦截器
            interceptors.add(new RetryAndFollowUpInterceptor(client));
            //桥接拦截器,桥接应用层与网络层,添加必要的头
            interceptors.add(new BridgeInterceptor(client.cookieJar()));
            //缓存处理
            interceptors.add(new CacheInterceptor(client.internalCache()));
            //连接拦截器
            interceptors.add(new ConnectInterceptor(client));
            if (!forWebSocket) {
                //②通过okHttpClient.Builder#addNetworkIntercerptor()传进来的拦截器只对非网页的请求生效
                interceptors.addAll(client.networkInterceptors());
            }
            //访问服务器的拦截器
            interceptors.add(new CallServerInterceptor(forWebSocket));
    
            Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
                    originalRequest, this, client.connectTimeoutMillis(),
                    client.readTimeoutMillis(), client.writeTimeoutMillis());
    
            boolean calledNoMoreExchanges = false;
            try {
                Response response = chain.proceed(originalRequest);
                if (transmitter.isCanceled()) {
                    closeQuietly(response);
                    throw new IOException("Canceled");
                }
                return response;
            } catch (IOException e) {
                calledNoMoreExchanges = true;
                throw transmitter.noMoreExchanges(e);
            } finally {
                if (!calledNoMoreExchanges) {
                    transmitter.noMoreExchanges(null);
                }
            }
        }
    

    走到这里,就到了通过拦截器完成发起请求和返回Responce。

    总结一下上面部分的时序图

    okhttp 时序图_wps图片.jpg

    拦截器-interceptor

    用户可以传入的interceptor分为两类:

    ①一类是全局的 interceptor,该类 interceptor 在整个拦截器链中最早被调用,通过 OkHttpClient.Builder#addInterceptor(Interceptor) 传入;

    ②另外一类是非网页请求的 interceptor ,这类拦截器只会在非网页请求中被调用,并且是在组装完请求之后,真正发起网络请求前被调用,所有的 interceptor 被保存在 List interceptors 集合中,按照添加顺序来逐个调用,具体可参考 RealCall#getResponseWithInterceptorChain() 方法。通过 OkHttpClient.Builder#addNetworkInterceptor(Interceptor) 传入;

    这里举一个简单的例子,例如有这样一个需求,我要监控App通过 OkHttp 发出的所有原始请求,以及整个请求所耗费的时间,针对这样的需求就可以使用第一类全局的 interceptor 在拦截器链头去做。

    OkHttpClient okHttpClient = new OkHttpClient.Builder()
            .addInterceptor(new LoggingInterceptor())
            .build();
    Request request = new Request.Builder()
            .url("http://www.publicobject.com/helloworld.txt")
            .header("User-Agent", "OkHttp Example")
            .build();
    okHttpClient.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.d(TAG, "onFailure: " + e.getMessage());
        }
    
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            ResponseBody body = response.body();
            if (body != null) {
                Log.d(TAG, "onResponse: " + response.body().string());
                body.close();
            }
        }
    });
    
    public class LoggingInterceptor implements Interceptor {
        private static final String TAG = "LoggingInterceptor";
        
        @Override
        public Response intercept(Chain chain) throws IOException {
            Request request = chain.request();
    
            long startTime = System.nanoTime();
            Log.d(TAG, String.format("Sending request %s on %s%n%s",
                    request.url(), chain.connection(), request.headers()));
    
            Response response =  chain.proceed(request);
    
            long endTime = System.nanoTime();
            Log.d(TAG, String.format("Received response for %s in %.1fms%n%s",
                    response.request().url(), (endTime - startTime) / 1e6d, response.headers()));
    
            return response;
        }
    }
    

    针对这个请求,打印出来的结果

    Sending request http://www.publicobject.com/helloworld.txt on null
    User-Agent: OkHttp Example
            
    Received response for https://publicobject.com/helloworld.txt in 1265.9ms
    Server: nginx/1.10.0 (Ubuntu)
    Date: Wed, 28 Mar 2018 08:19:48 GMT
    Content-Type: text/plain
    Content-Length: 1759
    Last-Modified: Tue, 27 May 2014 02:35:47 GMT
    Connection: keep-alive
    ETag: "5383fa03-6df"
    Accept-Ranges: bytes
    

    分析一下拦截器源码

    Interceptor.Chain chain = new RealInterceptorChain

    public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
        @Nullable Exchange exchange, int index, Request request, Call call,
        int connectTimeout, int readTimeout, int writeTimeout) {
      this.interceptors = interceptors;
      this.transmitter = transmitter;
      this.exchange = exchange;
      this.index = index;
      this.request = request;
      this.call = call;
      this.connectTimeout = connectTimeout;
      this.readTimeout = readTimeout;
      this.writeTimeout = writeTimeout;
    }
    

    Response response = chain.proceed(originalRequest);

    @Override public Response proceed(Request request) throws IOException {
      return proceed(request, transmitter, exchange);
    }
    
    public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
        throws IOException {
      if (index >= interceptors.size()) throw new AssertionError();
    
      calls++;
    
      // If we already have a stream, confirm that the incoming request will use it.
      if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
            + " must retain the same host and port");
      }
    
      // If we already have a stream, confirm that this is the only call to chain.proceed().
      if (this.exchange != null && calls > 1) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
            + " must call proceed() exactly once");
      }
    
      // Call the next interceptor in the chain.
      //重点在index+1,,通过不断的index+1,可以把interceptors集合里面的所有拦截器的proceed都执行一遍,得到最终的Responce
      RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
          index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
      Interceptor interceptor = interceptors.get(index);
      Response response = interceptor.intercept(next);
    
      // Confirm that the next interceptor made its required call to chain.proceed().
      if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
        throw new IllegalStateException("network interceptor " + interceptor
            + " must call proceed() exactly once");
      }
    
      // Confirm that the intercepted response isn't null.
      if (response == null) {
        throw new NullPointerException("interceptor " + interceptor + " returned null");
      }
    
      if (response.body() == null) {
        throw new IllegalStateException(
            "interceptor " + interceptor + " returned a response with no body");
      }
    
      return response;
    }
    

    拦截器是从数组的0下标开始一个个调用的。所以我们来看看ConnectInterceptor的源码

    @Override public Response intercept(Chain chain) throws IOException {
      RealInterceptorChain realChain = (RealInterceptorChain) chain;
      Request request = realChain.request();
      Transmitter transmitter = realChain.transmitter();
    
      // We need the network to satisfy this request. Possibly for validating a conditional GET.
      boolean doExtensiveHealthChecks = !request.method().equals("GET");
      Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    
      return realChain.proceed(request, transmitter, exchange);
    }
    

    transmitter.newExchange

    Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
      synchronized (connectionPool) {
        if (noMoreExchanges) {
          throw new IllegalStateException("released");
        }
        if (exchange != null) {
          throw new IllegalStateException("cannot make a new request because the previous response "
              + "is still open: please call response.close()");
        }
      }
    
      ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
      Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
    
      synchronized (connectionPool) {
        this.exchange = result;
        this.exchangeRequestDone = false;
        this.exchangeResponseDone = false;
        return result;
      }
    }
    

    exchangeFinder.find

    [------->ExchangeFinder.java]

    public ExchangeCodec find(
        OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
      int connectTimeout = chain.connectTimeoutMillis();
      int readTimeout = chain.readTimeoutMillis();
      int writeTimeout = chain.writeTimeoutMillis();
      int pingIntervalMillis = client.pingIntervalMillis();
      boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
      try {
        RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
            writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
        return resultConnection.newCodec(client, chain);
      } catch (RouteException e) {
        trackFailure();
        throw e;
      } catch (IOException e) {
        trackFailure();
        throw new RouteException(e);
      }
    }
    
    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
        int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
        boolean doExtensiveHealthChecks) throws IOException {
      while (true) {
        RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
            pingIntervalMillis, connectionRetryEnabled);
    
        // If this is a brand new connection, we can skip the extensive health checks.
        synchronized (connectionPool) {
          if (candidate.successCount == 0) {
            return candidate;
          }
        }
    
        // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
        // isn't, take it out of the pool and start again.
        if (!candidate.isHealthy(doExtensiveHealthChecks)) {
          candidate.noNewExchanges();
          continue;
        }
    
        return candidate;
      }
    }
    
    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
        int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
      boolean foundPooledConnection = false;
      RealConnection result = null;
      Route selectedRoute = null;
      RealConnection releasedConnection;
      Socket toClose;
      synchronized (connectionPool) {
        if (transmitter.isCanceled()) throw new IOException("Canceled");
        hasStreamFailure = false; // This is a fresh attempt.
    
        // Attempt to use an already-allocated connection. We need to be careful here because our
        // already-allocated connection may have been restricted from creating new exchanges.
        releasedConnection = transmitter.connection;
        toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
            ? transmitter.releaseConnectionNoEvents()
            : null;
    
        if (transmitter.connection != null) {
          // We had an already-allocated connection and it's good.
          result = transmitter.connection;
          releasedConnection = null;
        }
    
        if (result == null) {
          // Attempt to get a connection from the pool.
          if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
            foundPooledConnection = true;
            result = transmitter.connection;
          } else if (nextRouteToTry != null) {
            selectedRoute = nextRouteToTry;
            nextRouteToTry = null;
          } else if (retryCurrentRoute()) {
            selectedRoute = transmitter.connection.route();
          }
        }
      }
      closeQuietly(toClose);
    
      if (releasedConnection != null) {
        eventListener.connectionReleased(call, releasedConnection);
      }
      if (foundPooledConnection) {
        eventListener.connectionAcquired(call, result);
      }
      if (result != null) {
        // If we found an already-allocated or pooled connection, we're done.
        return result;
      }
    
      // If we need a route selection, make one. This is a blocking operation.
      boolean newRouteSelection = false;
      if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
        newRouteSelection = true;
        routeSelection = routeSelector.next();
      }
    
      List<Route> routes = null;
      synchronized (connectionPool) {
        if (transmitter.isCanceled()) throw new IOException("Canceled");
    
        if (newRouteSelection) {
          // Now that we have a set of IP addresses, make another attempt at getting a connection from
          // the pool. This could match due to connection coalescing.
          routes = routeSelection.getAll();
          if (connectionPool.transmitterAcquirePooledConnection(
              address, transmitter, routes, false)) {
            foundPooledConnection = true;
            result = transmitter.connection;
          }
        }
    
        if (!foundPooledConnection) {
          if (selectedRoute == null) {
            selectedRoute = routeSelection.next();
          }
    
          // Create a connection and assign it to this allocation immediately. This makes it possible
          // for an asynchronous cancel() to interrupt the handshake we're about to do.
          result = new RealConnection(connectionPool, selectedRoute);
          connectingConnection = result;
        }
      }
    
      // If we found a pooled connection on the 2nd time around, we're done.
      if (foundPooledConnection) {
        eventListener.connectionAcquired(call, result);
        return result;
      }
    
      // Do TCP + TLS handshakes. This is a blocking operation.
      result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
          connectionRetryEnabled, call, eventListener);
      connectionPool.routeDatabase.connected(result.route());
    
      Socket socket = null;
      synchronized (connectionPool) {
        connectingConnection = null;
        // Last attempt at connection coalescing, which only occurs if we attempted multiple
        // concurrent connections to the same host.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
          // We lost the race! Close the connection we created and return the pooled connection.
          result.noNewExchanges = true;
          socket = result.socket();
          result = transmitter.connection;
        } else {
          connectionPool.put(result);
          transmitter.acquireConnectionNoEvents(result);
        }
      }
      closeQuietly(socket);
    
      eventListener.connectionAcquired(call, result);
      return result;
    }
    

    result.connect

    [--------------->RealConnection.java]

    public void connect(int connectTimeout, int readTimeout, int writeTimeout,
        int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
        EventListener eventListener) {
      if (protocol != null) throw new IllegalStateException("already connected");
    
      RouteException routeException = null;
      List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
      ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    
      if (route.address().sslSocketFactory() == null) {
        if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
          throw new RouteException(new UnknownServiceException(
              "CLEARTEXT communication not enabled for client"));
        }
        String host = route.address().url().host();
        if (!Platform.get().isCleartextTrafficPermitted(host)) {
          throw new RouteException(new UnknownServiceException(
              "CLEARTEXT communication to " + host + " not permitted by network security policy"));
        }
      } else {
        if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
          throw new RouteException(new UnknownServiceException(
              "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
        }
      }
    
      while (true) {
        try {
          if (route.requiresTunnel()) {
            connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
            if (rawSocket == null) {
              // We were unable to connect the tunnel but properly closed down our resources.
              break;
            }
          } else {
            connectSocket(connectTimeout, readTimeout, call, eventListener);
          }
          establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
          eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
          break;
        } catch (IOException e) {
          closeQuietly(socket);
          closeQuietly(rawSocket);
          socket = null;
          rawSocket = null;
          source = null;
          sink = null;
          handshake = null;
          protocol = null;
          http2Connection = null;
    
          eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
    
          if (routeException == null) {
            routeException = new RouteException(e);
          } else {
            routeException.addConnectException(e);
          }
    
          if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
            throw routeException;
          }
        }
      }
    
      if (route.requiresTunnel() && rawSocket == null) {
        ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
            + MAX_TUNNEL_ATTEMPTS);
        throw new RouteException(exception);
      }
    
      if (http2Connection != null) {
        synchronized (connectionPool) {
          allocationLimit = http2Connection.maxConcurrentStreams();
        }
      }
    }
    

    connectSocket

    private void connectSocket(int connectTimeout, int readTimeout, Call call,
        EventListener eventListener) throws IOException {
      Proxy proxy = route.proxy();
      Address address = route.address();
    
      rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
          ? address.socketFactory().createSocket()
          : new Socket(proxy);
    
      eventListener.connectStart(call, route.socketAddress(), proxy);
      rawSocket.setSoTimeout(readTimeout);
      try {
        Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
      } catch (ConnectException e) {
        ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
        ce.initCause(e);
        throw ce;
      }
    
      // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
      // More details:
      // https://github.com/square/okhttp/issues/3245
      // https://android-review.googlesource.com/#/c/271775/
      try {
        source = Okio.buffer(Okio.source(rawSocket));
        sink = Okio.buffer(Okio.sink(rawSocket));
      } catch (NullPointerException npe) {
        if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
          throw new IOException(npe);
        }
      }
    }
    

    Platform.get().connectSocket

    [--------->Platform.java]

    public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
        throws IOException {
      socket.connect(address, connectTimeout);
    }
    

    连接拦截器的时序图:

    连接拦截器_wps图片.jpg

    连接池(connectionPool)

    在上面exchangeFinder.find的findConnection我们发现引入了connectionPool,顾名思义,这是一个连接池。


    okhttp连接池复用_wps图片.jpg

    每一个创建的socket对象都会放入到连接池中进行准备复用,并且给每个socket对象设置超时时间,在未超过时间时,同一个IP,端口的链接可以进行socket对象的复用,这样就可以避免重复打开链接所带来的性能下降。

    我们看看connectionPool是怎么实现的

    [-------->ExchangeFinder.java]

    ExchangeFinder(Transmitter transmitter, RealConnectionPool connectionPool,
        Address address, Call call, EventListener eventListener) {
      this.transmitter = transmitter;
      this.connectionPool = connectionPool;
      this.address = address;
      this.call = call;
      this.eventListener = eventListener;
      this.routeSelector = new RouteSelector(
          address, connectionPool.routeDatabase, call, eventListener);
    }
    
    

    [---------->Transmitter.java]

      public Transmitter(OkHttpClient client, Call call) {
        this.client = client;
        this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
        this.call = call;
        this.eventListener = client.eventListenerFactory().create(call);
        this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
      }
    
    this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
        call, eventListener);
    

    [-------->OkHttpClient.java]

    public ConnectionPool connectionPool() {
      return connectionPool;
    }
    
    public Builder() {
        ....
        connectionPool = new ConnectionPool();
    }
    

    [------------>ConnectionPool.java]

    public final class ConnectionPool {
      final RealConnectionPool delegate;
      ....
    }
    

    [---------->RealConnectionPool.java]

    public final class RealConnectionPool {
    
      private static final Executor executor = new ThreadPoolExecutor(0 /*              corePoolSize */,Integer.MAX_VALUE /* maximumPoolSize */, 
         60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool",     true));
          
      private final Deque<RealConnection> connections = new ArrayDeque<>();
         
      void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
      
        boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
          @Nullable List<Route> routes, boolean requireMultiplexed) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (requireMultiplexed && !connection.isMultiplexed()) continue;
          if (!connection.isEligible(address, routes)) continue;
          //从链接池里找到可以复用的链接
          transmitter.acquireConnectionNoEvents(connection);
          return true;
        }
        return false;
      }
     
    }
    

    手写okhttp架构

    https://github.com/games2sven/okhttp

    相关文章

      网友评论

          本文标题:OKHttp源码分析与手写实现

          本文链接:https://www.haomeiwen.com/subject/pwvetktx.html