美文网首页
OkHttp讲解(四)-链接池

OkHttp讲解(四)-链接池

作者: 涛涛123759 | 来源:发表于2021-08-22 17:21 被阅读0次

    Android知识总结

    一、 Interceptor 关联类分析

    1.1、 StreamAllocation 的成员变量

    简介
    StreamAllocation是用来协调Connections、Streams和Calls这三个实体的。

    • Connections:连接到远程服务器的物理套接字,这个套接字连接可能比较慢,所以它有一套取消机制。
    • Streams:定义了逻辑上的HTTP请求/响应对,每个连接都定义了它们可以携带的最大并* 发流,HTTP/1.x每次只可以携带一个,HTTP/2每次可以携带多个。
      Calls:定义了流的逻辑序列,这个序列通常是一个初始请求以及它的重定向请求,对于同一个连接,我们通常将所有流都放在一个调用中,以此来统一它们的行为。

    HTTP通信 执行 网络请求Call 需要在 连接Connection 上建立一个新的 流Stream,我们将 StreamAllocation 称之 流 的桥梁,它负责为一次 请求 寻找 连接 并建立 流,从而完成远程通信。

      public final Address address;//地址
      private Route route; //路由
      private final ConnectionPool connectionPool;  //连接池
      private final Object callStackTrace; //日志
    
      // State guarded by connectionPool.
      private final RouteSelector routeSelector; //路由选择器
      private int refusedStreamCount;  //拒绝的次数
      private RealConnection connection;  //连接
      private boolean released;  //是否已经被释放
      private boolean canceled  //是否被取消了
    

    1.2、 RealConnection

    • okhttp是底层实现框架,与httpURLconnection是同一级别的。OKHttp底层建立网络连接的关键就是RealConnection类。RealConnection类底层封装socket,是真正的创建连接者。分析这个类之后就明白了OKHttp与httpURLconnection的本质不同点。
    • RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路,而且通过RealConnection代表是连接socket链路,RealConnection对象意味着我们已经跟服务端有了一条通信链路了,在这个类里面实现的三次握手。
    • 在OKHttp里面,记录一次连接的是RealConnection,这个负责连接,在这个类里面用socket来连接,用HandShake来处理握手。
      //链接的线程池
      private final ConnectionPool connectionPool;
      private final Route route;
      //下面这些字段,通过connect()方法开始初始化,并且绝对不会再次赋值
      /** The low-level TCP socket. */
      private Socket rawSocket; //底层socket
      private Socket socket;  //应用层socket
      //握手
      private Handshake handshake;
       //协议
      private Protocol protocol;
       // http2的链接
      private Http2Connection http2Connection;
      //通过source和sink,大家可以猜到是与服务器交互的输入输出流
      private BufferedSource source;
      private BufferedSink sink;
      //下面这个字段是 属于表示链接状态的字段,并且有connectPool统一管理
      //如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,并且表示这个链接不会再创新的stream流
      public boolean noNewStreams;
      //成功的次数
      public int successCount;
      //此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
      public int allocationLimit = 1;
    

    RealConnection的connect方法,connect()里面进行了三次握手

    public void connect(。。。) {
         //如果协议不等于null,抛出一个异常
        if (protocol != null) throw new IllegalStateException("already connected");
    
       。。 省略部分代码。。。。
    
        while (true) {//一个while循环
             //如果是https请求并且使用了http代理服务器
            if (route.requiresTunnel()) {
              connectTunnel(...);
            } else {//
                //直接打开socket链接
              connectSocket(connectTimeout, readTimeout);
            }
            //建立协议
            establishProtocol(connectionSpecSelector);
            break;//跳出while循环
            。。省略部分代码。。。
      }
    
     //当前route的请求是https并且使用了Proxy.Type.HTTP代理
     public boolean requiresTunnel() {
        return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
      }
    

    普通连接的建立过程为建立TCP连接,建立TCP连接的过程为

     private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
    
        //根据代理类型来选择socket类型,是代理还是直连
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
    
        rawSocket.setSoTimeout(readTimeout);
        try {
        //连接socket
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          throw new ConnectException("Failed to connect to " + route.socketAddress());
        }
        source = Okio.buffer(Okio.source(rawSocket));//从socket中获取source 对象。
        sink = Okio.buffer(Okio.sink(rawSocket));//从socket中获取sink 对象。
      }
    

    Okio.source(rawSocket)Okio.sink(rawSocket)的原码

      public static Source source(Socket socket) throws IOException {
        if (socket == null) throw new IllegalArgumentException("socket == null");
        AsyncTimeout timeout = timeout(socket);
        Source source = source(socket.getInputStream(), timeout);
        return timeout.source(source);
      }
    
      public static Sink sink(Socket socket) throws IOException {
        if (socket == null) throw new IllegalArgumentException("socket == null");
        AsyncTimeout timeout = timeout(socket);
        Sink sink = sink(socket.getOutputStream(), timeout);
        return timeout.sink(sink);
      }
    

    建立隧道连接的过程

      private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
          throws IOException {
          //1、创建隧道请求对象
        Request tunnelRequest = createTunnelRequest();
        HttpUrl url = tunnelRequest.url();
        int attemptedConnections = 0;
        int maxAttempts = 21;
        //一个while循环
        while (true) {
           //尝试连接词说超过最大次数
          if (++attemptedConnections > maxAttempts) {
            throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
          }
          //2、打开socket链接
          connectSocket(connectTimeout, readTimeout);
         //3、请求开启隧道并返回tunnelRequest 
          tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
    
         //4、成功开启了隧道,跳出while循环
          if (tunnelRequest == null) break; /
    
          //隧道未开启成功,关闭相关资源,继续while循环    
          //当然,循环次数超限后抛异常,退出wiile循环
          closeQuietly(rawSocket);
          rawSocket = null;
          sink = null;
          source = null;
        }
      }
      //隧道请求是一个常规的HTTP请求,只是请求的内容有点特殊。最初创建的隧道请求如
      private Request createTunnelRequest() {
        return new Request.Builder()
            .url(route.address().url())
            .header("Host", Util.hostHeader(route.address().url(), true))
            .header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
            .header("User-Agent", Version.userAgent())
            .build();
      }
    

    二、创建链接流程

    ConnectInterceptor里面创建链接,并把创建的链接放如链接池中。具体过程如下:

    @Override 
    public Response intercept(Chain chain) throws IOException {
       RealInterceptorChain realChain = (RealInterceptorChain) chain;
       Request request = realChain.request();
        //获取可复用流
       StreamAllocation streamAllocation = realChain.streamAllocation();
       boolean doExtensiveHealthChecks = !request.method().equals("GET");
        //创建输出流
       HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
       //根据HTTP/1.x(keep-alive)和HTTP/2(流复用)的复用机制,发起连接
       RealConnection connection = streamAllocation.connection();
       return realChain.proceed(request, streamAllocation, httpCodec, connection);
     }
    

    通过ConnectInterceptor 中的HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);方法调用StreamAllocation#newStream方法

      public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
        int connectTimeout = client.connectTimeoutMillis();
        int readTimeout = client.readTimeoutMillis();
        int writeTimeout = client.writeTimeoutMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
        try {
          //获取一个健康的连接
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
          //实例化HttpCodec,如果是HTTP/2则是Http2Codec否则是Http1Codec
          HttpCodec resultCodec = resultConnection.newCodec(client, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
      }
    

    然后调用StreamAllocation#findHealthyConnection方法

        private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
                                                     int writeTimeout, int pingIntervalMillis,
                                                     boolean connectionRetryEnabled,
                                                     boolean doExtensiveHealthChecks) throws IOException {
            while (true) {
                //todo 找到一个连接
                RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
                        pingIntervalMillis, connectionRetryEnabled);
    
                //todo 如果这个连接是新建立的,那肯定是健康的,直接返回
                synchronized (connectionPool) {
                    if (candidate.successCount == 0) {
                        return candidate;
                    }
                }
    
                //todo 如果不是新创建的,需要检查是否健康
                if (!candidate.isHealthy(doExtensiveHealthChecks)) {
                    //todo 不健康 关闭连接,释放Socket,从连接池移除
                    // 继续下次寻找连接操作
                    noNewStreams();
                    continue;
                }
                return candidate;
            }
        }
    

    然后调用StreamAllocation#findConnection找一个连接

        private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
                                              int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
            boolean foundPooledConnection = false;
            RealConnection result = null;
            Route selectedRoute = null;
            Connection releasedConnection;
            Socket toClose;
            synchronized (connectionPool) {
                if (released) throw new IllegalStateException("released");
                if (codec != null) throw new IllegalStateException("codec != null");
                if (canceled) throw new IOException("Canceled");
                releasedConnection = this.connection;
                toClose = releaseIfNoNewStreams();
                if (this.connection != null) {
                    // We had an already-allocated connection and it's good.
                    result = this.connection;
                    releasedConnection = null;
                }
                if (!reportedAcquired) {
                    // If the connection was never reported acquired, don't report it as released!
                    releasedConnection = null;
                }
    
                if (result == null) {
                    //todo 尝试从连接池获取连接,如果有可复用的连接,会给第三个参数 this的connection赋值
                    //Attempt to get a connection from the pool.
                    Internal.instance.get(connectionPool, address, this, null);
                    if (connection != null) {
                        foundPooledConnection = true;
                        result = connection;
                    } else {
                        selectedRoute = route;
                    }
                }
            }
            closeQuietly(toClose);
    
            if (releasedConnection != null) {
                eventListener.connectionReleased(call, releasedConnection);
            }
            if (foundPooledConnection) {
                eventListener.connectionAcquired(call, result);
            }
            if (result != null) {
                // If we found an already-allocated or pooled connection, we're done.
                return result;
            }
    
            // If we need a route selection, make one. This is a blocking operation.
            //todo 创建一个路由 (dns解析的所有ip与代理的组合)
            boolean newRouteSelection = false;
            if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
                newRouteSelection = true;
                routeSelection = routeSelector.next();
            }
    
            synchronized (connectionPool) {
                if (canceled) throw new IOException("Canceled");
    
                if (newRouteSelection) {
                    //todo 根据代理和不同的ip从连接池中找可复用的连接
                    List<Route> routes = routeSelection.getAll();
                    for (int i = 0, size = routes.size(); i < size; i++) {
                        Route route = routes.get(i);
                        Internal.instance.get(connectionPool, address, this, route);
                        if (connection != null) {
                            foundPooledConnection = true;
                            result = connection;
                            this.route = route;
                            break;
                        }
                    }
                }
                //todo 还是没找到,必须新建一个连接了
                if (!foundPooledConnection) {
                    if (selectedRoute == null) {
                        selectedRoute = routeSelection.next();
                    }
                    route = selectedRoute;
                    refusedStreamCount = 0;
                    result = new RealConnection(connectionPool, selectedRoute);
                    acquire(result, false);
                }
            }
            if (foundPooledConnection) {
                eventListener.connectionAcquired(call, result);
                return result;
            }
            // Do TCP + TLS handshakes. This is a blocking operation.
            //todo 实际上就是创建socket连接,但是要注意的是如果存在http代理的情况
            result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                    connectionRetryEnabled, call, eventListener);
            routeDatabase().connected(result.route());
    
            Socket socket = null;
            synchronized (connectionPool) {
                reportedAcquired = true;
    
                // Pool the connection.
                //todo 将新创建的连接放到连接池中
                Internal.instance.put(connectionPool, result);
                if (result.isMultiplexed()) {
                    socket = Internal.instance.deduplicate(connectionPool, address, this);
                    result = connection;
                }
            }
            closeQuietly(socket);
    
            eventListener.connectionAcquired(call, result);
            return result;
        }
    

    然后调用StreamAllocation#isHealthy判断是否健康链接

        public boolean isHealthy(boolean doExtensiveChecks) {
            //todo Socket关闭,肯定不健康
            if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
                return false;
            }
    
            if (http2Connection != null) {
                return !http2Connection.isShutdown();
            }
    
            if (doExtensiveChecks) {
                try {
                    int readTimeout = socket.getSoTimeout();
                    try {
                        socket.setSoTimeout(1);
                        if (source.exhausted()) {
                            return false; // Stream is exhausted; socket is closed.
                        }
                        return true;
                    } finally {
                        socket.setSoTimeout(readTimeout);
                    }
                } catch (SocketTimeoutException ignored) {
                    // Read timed out; socket is good.
                } catch (IOException e) {
                    return false; // Couldn't read; socket is closed.
                }
            }
            return true;
        }
    

    三、链接池的存取数据

    根据图我们看ConnectionPool里面代码:

    • 1、创建
    //这是一个用于清楚过期链接的线程池,每个线程池最多只能运行一个线程,并且这个线程池允许被垃圾回收
      private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
      /** The maximum number of idle connections for each address. */
      //每个address的最大空闲连接数。
      private final int maxIdleConnections;
      private final long keepAliveDurationNs;
      //清理任务
      private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
      //链接的双向队列
      private final Deque<RealConnection> connections = new ArrayDeque<>();
      //路由的数据库
      final RouteDatabase routeDatabase = new RouteDatabase();
       //清理任务正在执行的标志
      boolean cleanupRunning;
    //创建一个适用于单个应用程序的新连接池。
     //该连接池的参数将在未来的okhttp中发生改变
     //目前最多可容乃5个空闲的连接,存活期是5分钟
      public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
      }
    
      public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
        this.maxIdleConnections = maxIdleConnections;
        this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
    
        // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
        //保持活着的时间,否则清理将旋转循环
        if (keepAliveDuration <= 0) {
          throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
        }
      }
    

    管理http和http/2的链接,以便减少网络请求延迟。同一个address将共享同一个connection。该类实现了复用连接的目标。

    • 1、主要就是connections,可见ConnectionPool内部以队列方式存储连接;
    • 2、routDatabase是一个黑名单,用来记录不可用的route,但是看代码貌似ConnectionPool并没有使用它。所以此处不做分析。
    • 3、剩下的就是和清理有关了,所以executor是清理任务的线程池,cleanupRunning是清理任务的标志,cleanupRunnable是清理任务。
    • 2、put 链接
        /**
         * todo 保存连接以复用。
         * 本方法没上锁,只加了断言: 当前线程拥有this(pool)对象的锁。
         * 表示使用这个方法必须要上锁,而且是上pool的对象锁。
         * okhttp中使用到这个函数的地方确实也是这么做的
         */
        void put(RealConnection connection) {
            assert (Thread.holdsLock(this));
            //todo 如果清理任务未执行就启动它,再把新连接加入队列
            if (!cleanupRunning) {
                cleanupRunning = true;
                executor.execute(cleanupRunnable);
            }
            connections.add(connection);
        }
    
    • 2、get 链接
        /**
         * todo 获取可复用的连接
         */
        @Nullable
        RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
            assert (Thread.holdsLock(this));
            for (RealConnection connection : connections) {
                //todo 要拿到的连接与连接池中的连接  连接的配置(dns/代理/域名等等)一致 就可以复用
                // 在使用了,所以 acquire 会创建弱引用放入集合记录
                if (connection.isEligible(address, route)) {
                    streamAllocation.acquire(connection, true);
                    return connection;
                }
            }
            return null;
        }
    

    执行RealConnection#isEligible检查是否可以复用

        public boolean isEligible(Address address, @Nullable Route route) {
            // If this connection is not accepting new streams, we're done.
            //todo 实际上就是在使用(对于http1.1)就不能复用
            if (allocations.size() >= allocationLimit || noNewStreams) return false;
            // If the non-host fields of the address don't overlap, we're done.
            //todo 如果地址不同,不能复用。包括了配置的dns、代理、证书以及端口等等 (域名还没判断,所有下面马上判断域名)
            if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
    
            // If the host exactly matches, we're done: this connection can carry the address.
            //todo 都相同,那就可以复用了
            if (address.url().host().equals(this.route().address().url().host())) {
                return true; // This connection is a perfect match.
            }
    
            // At this point we don't have a hostname match. But we still be able to carry the
          // request if
            // our connection coalescing requirements are met. See also:
            // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
            // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
    
            // 1. This connection must be HTTP/2.
            if (http2Connection == null) return false;
    
            // 2. The routes must share an IP address. This requires us to have a DNS address for both
            // hosts, which only happens after route planning. We can't coalesce connections that use a
            // proxy, since proxies don't tell us the origin server's IP address.
            if (route == null) return false;
            if (route.proxy().type() != Proxy.Type.DIRECT) return false;
            if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
            if (!this.route.socketAddress().equals(route.socketAddress())) return false;
    
            // 3. This connection's server certificate's must cover the new host.
            if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
            if (!supportsUrl(address.url())) return false;
    
            // 4. Certificate pinning must match the host.
            try {
                address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
            } catch (SSLPeerUnverifiedException e) {
                return false;
            }
            return true; // The caller's address can be carried by this connection.
        }
    
    • 2、清理 链接池
        long cleanup(long now) {
            int inUseConnectionCount = 0;
            int idleConnectionCount = 0;
            RealConnection longestIdleConnection = null;
            long longestIdleDurationNs = Long.MIN_VALUE;
    
            // Find either a connection to evict, or the time that the next eviction is due.
            synchronized (this) {
                for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                    RealConnection connection = i.next();
    
                    //todo 检查连接是否正在被使用
                    //If the connection is in use, keep searching.
                    if (pruneAndGetAllocationCount(connection, now) > 0) {
                        inUseConnectionCount++;
                        continue;
                    }
                    //todo 否则记录闲置连接数
                    idleConnectionCount++;
    
                    // If the connection is ready to be evicted, we're done.
                    //TODO 获得这个连接已经闲置多久
                    // 执行完遍历,获得闲置了最久的连接
                    long idleDurationNs = now - connection.idleAtNanos;
                    if (idleDurationNs > longestIdleDurationNs) {
                        longestIdleDurationNs = idleDurationNs;
                        longestIdleConnection = connection;
                    }
                }
                //todo 超过了保活时间(5分钟) 或者池内数量超过了(5个) 马上移除,然后返回0,表示不等待,马上再次检查清理
                if (longestIdleDurationNs >= this.keepAliveDurationNs
                        || idleConnectionCount > this.maxIdleConnections) {
                    // We've found a connection to evict. Remove it from the list, then close it
                    // below (outside
                    // of the synchronized block).
                    connections.remove(longestIdleConnection);
                } else if (idleConnectionCount > 0) {
                    // A connection will be ready to evict soon.
                    //todo 池内存在闲置连接,就等待 保活时间(5分钟)-最长闲置时间 =还能闲置多久 再检查
                    return keepAliveDurationNs - longestIdleDurationNs;
                } else if (inUseConnectionCount > 0) {
                    // All connections are in use. It'll be at least the keep alive duration 'til we
                    // run again.
                    //todo 有使用中的连接,就等 5分钟 再检查
                    return keepAliveDurationNs;
                } else {
                    // No connections, idle or in use.
                    //todo 都不满足,可能池内没任何连接,直接停止清理(put后会再次启动)
                    cleanupRunning = false;
                    return -1;
                }
            }
            closeQuietly(longestIdleConnection.socket());
            // Cleanup again immediately.
            return 0;
        }
    

    ConnectionPool#pruneAndGetAllocationCount检查连接是否正在被使用

        private int pruneAndGetAllocationCount(RealConnection connection, long now) {
            //todo 这个连接被使用就会创建一个弱引用放入集合,这个集合不为空就表示这个连接正在被使用
            // 实际上 http1.x 上也只能有一个正在使用的。
            List<Reference<StreamAllocation>> references = connection.allocations;
            for (int i = 0; i < references.size(); ) {
                Reference<StreamAllocation> reference = references.get(i);
                if (reference.get() != null) {
                    i++;
                    continue;
                }
    
                // We've discovered a leaked allocation. This is an application bug.
                StreamAllocation.StreamAllocationReference streamAllocRef =
                        (StreamAllocation.StreamAllocationReference) reference;
                String message = "A connection to " + connection.route().address().url()
                        + " was leaked. Did you forget to close a response body?";
                Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
    
                references.remove(i);
                connection.noNewStreams = true;
    
    
                // If this was the last allocation, the connection is eligible for immediate eviction.
                if (references.isEmpty()) {
                    connection.idleAtNanos = now - keepAliveDurationNs;
                    return 0;
                }
            }
            return references.size();
        }
    

    相关文章

      网友评论

          本文标题:OkHttp讲解(四)-链接池

          本文链接:https://www.haomeiwen.com/subject/qlfhiltx.html