Okhttp连接池

作者: gczxbb | 来源:发表于2020-05-12 17:03 被阅读0次

    一、ConnectInterceptor拦截器

    okhttp连接池与链路

    ConnectInterceptor拦截器,intercept()方法,创建Network链路。
    每一次RealCall请求类,分配一个StreamAllocation对象,负责RealConnection管理,连接池操作,HttpCodec,网络链路的分配、取消、释放,RealConnection可复用。

    @Override
    public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();//从Chain中获取Request
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
        //处理返回Response
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
    }
    

    RetryAndFollowUpInterceptor拦截器(位于拦截器列表第二项目),intercept()方法,创建StreamAllocation对象,每次递归将引用传给下一个新建Chain。

    streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(request.url()), callStackTrace);
    

    到达ConnectInterceptor拦截器时,从Chain中取出,StreamAllocation类的newStream()方法,创建/查找链路。

    public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
        ...
        try {
            RealConnection resultConnection = findHealthyConnection(connectTimeout, 
                    readTimeout,writeTimeout, connectionRetryEnabled, 
                    doExtensiveHealthChecks);
            HttpCodec resultCodec = resultConnection.newCodec(client, this);
            synchronized (connectionPool) {
                codec = resultCodec;
                return resultCodec;
          }
        } catch (IOException e) {
        }
    }
    

    findConnection()方法,通过策略寻找一个真正的RealConnection连接,该连接和Server建立个Socket连接,RealConnection类初始化HttpCodec。

    public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) 
                throws SocketException {
        if (http2Connection != null) {
            return new Http2Codec(client, streamAllocation, http2Connection);
        } else {
            socket.setSoTimeout(client.readTimeoutMillis());
            source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
            sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
            return new Http1Codec(client, streamAllocation, source, sink);
        }
    }
    

    若采用Protocol.HTTP_2协议,即Http2Connection存在,创建Http2Codec,否则,创建Http1Codec,封装BufferedSource和BufferedSink。

    这些新初始化的对象StreamAllocation、HttpCodec、RealConnection,一起传递给Chain节点,下一个拦截器使用。

    二、分配策略

    StreamAllocation类findConnection()方法。

    private RealConnection findConnection(int connectTimeout, int readTimeout, 
                int writeTimeout,boolean connectionRetryEnabled) throws IOException {
        Route selectedRoute;
        synchronized (connectionPool) {
            //先使用StreamAllocation内部保存的RealConnection
            RealConnection allocatedConnection = this.connection;
            if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
                return allocatedConnection;
            }
            //连接池中寻找
            Internal.instance.get(connectionPool, address, this, null);
            if (connection != null) {
                return connection;
            }
            selectedRoute = route;
        }
        ...
        RealConnection result;
        synchronized (connectionPool) {
            Internal.instance.get(connectionPool, address, this, selectedRoute);
            if (connection != null) return connection;
            //创建RealConnection
            route = selectedRoute;
            refusedStreamCount = 0;
            result = new RealConnection(connectionPool, selectedRoute);
            acquire(result);
        }
        //socket连接
        result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
        routeDatabase().connected(result.route());
    
        Socket socket = null;
        synchronized (connectionPool) {
          //入池
          Internal.instance.put(connectionPool, result);
          ...
        }
        return result;
      }
    

    1,StreamAllocation内部RealConnection连接。
    2,从ConnectionPool连接池查找。
    3,创建RealConnection对象,保存在StreamAllocation内部,入池。

    OkHttpClient类静态代码段初始化一个Internal对象。

    static {
        Internal.instance = new Internal() {
            @Override public 
            RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {
                return pool.get(address, streamAllocation, route);
            }
            @Override 
            public void put(ConnectionPool pool, RealConnection connection) {
                pool.put(connection);
            }
            ....//其他方法.
        };
    }
    

    ConnectionPool类get()方法,查询连接池中的RealConnection。

    @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        for (RealConnection connection : connections) {
            if (connection.isEligible(address, route)) {
                streamAllocation.acquire(connection, true);
                return connection;
            }
        }
        return null;
    }
    

    遍历连接池每项RealConnection,当Address相同且StreamAllocationReference数量小于限制,说明是可用连接。
    RealConnection内部引用StreamAllocation类型弱引用列表,allocationLimit变量限制StreamAllocation弱引用数量。

    public void acquire(RealConnection connection) {
        ..
        this.connection = connection;
        connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
    }
    

    将可用RealConnection设置成StreamAllocation内部connection,同时将StreamAllocation加入RealConnection内部弱引用列表。
    达到限制时,不能再被StreamAllocation使用,新建链路,新RealConnection同样赋值到StreamAllocation内部,将该StreamAllocation加入弱引用列表,最后put连接池。

    RealConnection代表一个真正的链路,封装BufferedSource、BufferedSink、路由、socket、协议、Handshake握手信息。

    若是新链接,进行Socket连接,connect()方法,connectSocket()方法建立连接。

    private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
        //创建Socket
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
        rawSocket.setSoTimeout(readTimeout);
        try {
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
          ce.initCause(e);
          throw ce;
        }
        source = Okio.buffer(Okio.source(rawSocket));
        sink = Okio.buffer(Okio.sink(rawSocket));
    }
    

    创建一个Socket,Platform#connectSocket方法,连接。

    public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException {
        socket.connect(address, connectTimeout);
    }
    

    成功,和Server建立一个Socket连接,失败,抛出异常。

    三、连接清理

    ConnectionPool连接池管理所有Socket连接,当有新的请求时,从池中分配一个链路。
    ArrayDeque双向队列,线性连续空间,双向开口,在头尾两端插入删除高效,同时具有队列和栈性质,缓存常用。
    默认支持5个并发keepalive,链路生命为5分钟,即链路数据传输完成,可保持5分钟的存活时间。
    自动清除线程,将查找超过5分钟的链路,关闭socket。

    任务清理流程

    ConnectionPool的get()/put()操作方法。

    void put(RealConnection connection) {
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);//执行清理任务
        }
        connections.add(connection);//加入连接池队列
    }
    

    线程池执行cleanupRunnable清理任务,设置cleanupRunning标志位。实质上是一个阻塞的清理任务。若while一直运行,下次put()将不会触发。

    while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
            long waitMillis = waitNanos / 1000000L;
            waitNanos -= (waitMillis * 1000000L);
            synchronized (ConnectionPool.this) {
                try {
                    ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
            }
        }
    }
    

    waitNanos等待下次清理的间隔时间,-1表示不需要再次清理,退出循环,0表示立即再次清理,wait()方法等待,释放锁与时间片

    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
        synchronized (this) {
            for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                RealConnection connection = i.next();
                // 是否在使用
                if (pruneAndGetAllocationCount(connection, now) > 0) {
                    inUseConnectionCount++;
                    continue;
                }
                idleConnectionCount++;
                //找空闲最长的
                long idleDurationNs = now - connection.idleAtNanos;
                if (idleDurationNs > longestIdleDurationNs) {
                    longestIdleDurationNs = idleDurationNs;
                    longestIdleConnection = connection;
                }
            }  
            //决定是否清理
            if (longestIdleDurationNs >= this.keepAliveDurationNs
                    || idleConnectionCount > this.maxIdleConnections) {
                 //删除连接
                connections.remove(longestIdleConnection);
            } else if (idleConnectionCount > 0) {
                return keepAliveDurationNs - longestIdleDurationNs;
            } else if (inUseConnectionCount > 0) {
                return keepAliveDurationNs;
            } else {
                cleanupRunning = false;
                return -1;
            }
        }
        //关闭Socket
        closeQuietly(longestIdleConnection.socket());
        return 0;
    }
    

    遍历连接,pruneAndGetAllocationCount()方法查看连接是否在使用。inUseConnectionCount,正在使用数量,自增,idleConnectionCount,空闲数量,未使用自增。
    idleDurationNs,空闲连接,计算已空闲时间,查找到空闲时间最长的。

    清理算法

    1,longestIdleDurationNs(最长空闲时间),大于5min或空闲数量超过5个(默认),将对应longestIdleConnection连接删除,返回0,下一次立即清理。
    2,不足5min,空闲连接数量<5,返回timeout(距离5min还有多久),线程阻塞timeout后,再次清理。
    3,空闲连接=0,且正使用连接>0,返回5min,最长等待时间。
    4,空闲和正使用连接都0,返回-1,不清理,线程结束。设置cleanupRunning标志位,下次put()方法重新唤起cleanupRunnable任务。

    清理过程

    从连接池队列ArrayDeque中删除该项连接。
    closeQuietly()方法关闭Socket。

    using连接判断

    遍历RealConnection内部Reference<StreamAllocation>列表
    reference.get()不空,说明有StreamAllocation正引用RealConnection。
    reference.get()不存在,说明StreamAllocation已被Jvm清理,同时,从references列表中删除该项reference。
    若references列表是空,说明references弱引用都是空,没有StreamAllocation使用该连接。

    四、总结

    ConnectInterceptor拦截器负责Network连接。

    每一个RealCall请求对应一个StreamAllocation。

    真正的连接RealConnection复用。

    连接池采用ArrayDeque双向队列数据结构。

    连接清理任务。


    任重而道远

    相关文章

      网友评论

        本文标题:Okhttp连接池

        本文链接:https://www.haomeiwen.com/subject/jsxinhtx.html