美文网首页Android进阶之路Android技术知识
Android架构师学好OKhttp网络框架——Socket连接

Android架构师学好OKhttp网络框架——Socket连接

作者: 码农的地中海 | 来源:发表于2022-07-08 21:56 被阅读0次
    5252f77a5fc580c32f711ab87c2cee94.jpeg

    概述

    提高网络性能优化,很重要的一点就是降低延迟和提升响应速度。 通常我们在浏览器中发起请求的时候header部分往往是这样的 keep-alive 就是浏览器和服务端之间保持长连接,这个连接是可以复用的。在HTTP1.1中是默认开启的。 连接的复用为什么会提高性能呢? 通常我们在发起http请 ...

    通常我们在浏览器中发起请求的时候header部分往往是这样的


    image.png

    keep-alive 就是浏览器和服务端之间保持长连接,这个连接是可以复用的。在HTTP1.1中是默认开启的。

    连接的复用为什么会提高性能呢?
    通常我们在发起http请求的时候首先要完成tcp的三次握手,然后传输数据,最后再释放连接。三次握手的过程可以参考

    一次响应的过程


    image.png

    在高并发的请求连接情况下或者同个客户端多次频繁的请求操作,无限制的创建会导致性能低下。

    如果使用keep-alive

    image.png

    在timeout空闲时间内,连接不会关闭,相同重复的request将复用原先的connection,减少握手的次数,大幅提高效率。

    并非keep-alive的timeout设置时间越长,就越能提升性能。长久不关闭会造成过多的僵尸连接和泄露连接出现。

    那么okttp在客户端是如果类似于客户端做到的keep-alive的机制。

    连接池原理

    多少了解点OkHttp3的同学都知道,OkHttp可以降低网络延时加快网络请求响应的速度。那么它是怎样做到的呢?在说这个之前,我们先简单回顾一下Http协议。Http协议是一个无连接的协议,客户端(请求头+请求体)发送请求给服务端,服务端收到(请求头+请求体)后响应数据(响应头和响应体)并返回。由于Http协议的底层实现是基于TCP协议的(保证数据准确到达),所以在请求+响应的过程中必然少不了Tcp的三次握手和释放资源时的四次挥手。我们假设有这么一种情况,客户端需要每隔10秒向服务端发送心跳包,如果按照无连接的状态每次客户端请求和服务端响应都需要经过Tcp的三次握手和四次挥手,这样高频率的发送重复的请求会严重影响网络的性能,就算除去头部字段在频繁三次握手和四次挥手的情况下网络性能也非常堪忧。那么有没有一种办法能够让,Http的链接保持一段时间,如果有形同请求时复用这个链接,在超时的时候把链接断掉,从而减少握手次数呢?答案是肯定的,OkHttp3已经帮我们设计好了。

    OkHttp3连接池原理:OkHttp3使用ConnectionPool连接池来复用链接,其原理是:当用户发起请求是,首先在链接池中检查是否有符合要求的链接(复用就在这里发生),如果有就用该链接发起网络请求,如果没有就创建一个链接发起请求。这种复用机制可以极大的减少网络延时并加快网络的请求和响应速度。

    三、源码分析

    我们主要看下ConnectionPool连接池的源代码,看其是怎样实现的,我们一段一段拆分着看。

    private final int maxIdleConnections;
    //每个地址最大的空闲连接数
    private final long keepAliveDurationNs;<br>private final Deque<RealConnection> connections = new ArrayDeque<>();
    //连接池,其是一个双端链表结果,支持在头尾插入元素,且是一个后进先出的队列 final RouteDatabase routeDatabase = newRouteDatabase();
    //用来记录链接失败的路由 <br>boolean cleanupRunning;
    
    private static final Executor executor = new ThreadPoolExecutor(0 /* 核心线程数 */,
        Integer.MAX_VALUE /*线程池可容纳的最大线程数量 */, 60L /* 线程池中的线程最大闲置时间 */, TimeUnit.SECONDS,/*闲置时间的单位*/
        new SynchronousQueue<Runnable>()/*线程池中的任务队列,通过线程池的execute方法提交的runnable会放入这个队列中*/, Util.threadFactory("OkHttp ConnectionPool", true)<br>/*工具类用来创建线程的,其原型是ThreadFactory*/);
    

    通过上面的代码可知,ConnectionPool中会创建一个线程池,这个线程池的作用就是为了清理掉闲置的链接(Socket)。ConnectionPool利用自身的put方法向连接池中添加链接(每一个RealConnection都是一个链接)

    void put(RealConnection connection) {<br>   //java1.4中新增的关键字,如果为true无异常,如果为false则抛出一个异常
       assert (Thread.holdsLock(this));<br>  //利用线程池清除空闲的Socket
       if (!cleanupRunning) {
         cleanupRunning = true;
         executor.execute(cleanupRunnable);
       }<br>  //向链接池中加入链接
       connections.add(connection);
     }
    

    通过以上代码我们发现向线程池中添加一个链接(RealConnection)其实是向连接池connections添加RealConnection。并且在添加之前需要调用线程池的execute方法区清理闲置的链接。

    下面我们看下清理动作是如何实现的,直接看cleanupRunnable这个匿名内部类

    private final Runnable cleanupRunnable = new Runnable() {
       @Override public void run() {<br>   //死循环,不停的执行cleanup的清理工作
         while (true) {<br>     //返回下次清理的时间间隔
           long waitNanos = cleanup(System.nanoTime());<br>    //如果返回-1就直接停止
           if (waitNanos == -1) return;<br>    //如果下次清理的时间几个大于0
           if (waitNanos > 0) {
             long waitMillis = waitNanos / 1000000L;
             waitNanos -= (waitMillis * 1000000L);
             synchronized (ConnectionPool.this) {
               try {<br>        //根据下次返回的时间间隔来释放wait锁
                 ConnectionPool.this.wait(waitMillis, (int) waitNanos);
               } catch (InterruptedException ignored) {
               }
             }
           }
         }
       }
     };
    

    在Runnable的内部会不停的执行死循环,调用cleanup来清理空闲的链接,并返回一个下次清理的时间间隔,根据这个时间间隔来释放wait锁。

    接下来看下cleanup的具体执行步骤

    long cleanup(long now) {
        int inUseConnectionCount = 0;//正在使用的链接数量
        int idleConnectionCount = 0;//闲置的链接数量<br>  //长时间闲置的链接
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
     
        // 用for循环来遍历连接池
        synchronized (this) {
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
            // 如果当前链接正在使用,就执行continue,进入下一次循环.
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
         //否则闲置链接+1
            idleConnectionCount++;
     
            // 如果闲置时间间隔大于最大的闲置时间,那就把当前的链接赋值给最大闲置时间的链接.
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
        //如果最大闲置时间间隔大于保持链接的最大时间间隔或者限制连接数大于连接池允许的最大闲置连接数,就把该链接从连接池中移除
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            // 如果闲置链接数大于0,则返回允许保持链接的最大时间间隔-最长时间间隔,也就是下次返回的时间间隔
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // 如果所有的链接都在使用则直接返回保持时间间隔的最大值
            return keepAliveDurationNs;
          } else {
            // 如果以上条件都不满足,则清除事变,返回-1
            cleanupRunning = false;
            return -1;
          }
        }
      //关闭闲置时间最长的那个socket
        closeQuietly(longestIdleConnection.socket());
     
        // Cleanup again immediately.
        return 0;
      }  
    

    cleanup(now)这个方法比较长,内容也比较多。我们把握大体逻辑就行。其核心逻辑是返回下次清理的时间间隔,其清理的核心是:链接的限制时间如果大于用户设置的最大限制时间或者闲置链接的数量已经超出了用户设置的最大数量,则就执行清除操作。其下次清理的时间间隔有四个值:

    1.如果闲置的连接数大于0就返回用户设置的允许限制的时间-闲置时间最长的那个连接的闲置时间。

    2.如果清理失败就返回-1,

    3.如果清理成功就返回0,

    4.如果没有闲置的链接就直接返回用户设置的最大清理时间间隔。

    下面看一下系统是如何判断当前循环到的链接是正在使用的链接

    private int pruneAndGetAllocationCount(RealConnection connection, long now) {<br>  //编译StreamAllocation弱引用链表
        List<Reference<StreamAllocation>> references = connection.allocations;
        for (int i = 0; i < references.size(); ) {
          Reference<StreamAllocation> reference = references.get(i);
        //如果StreamAllocation不为空则继续遍历,计数器+1;
          if (reference.get() != null) {
            i++;
            continue;
          }
     
          // We've discovered a leaked allocation. This is an application bug.
          StreamAllocation.StreamAllocationReference streamAllocRef =
              (StreamAllocation.StreamAllocationReference) reference;
         //移除链表中为空的引用
          references.remove(i);
          connection.noNewStreams = true;
     
          // If this was the last allocation, the connection is eligible for immediate eviction.<br>    //如果链表为空则返回0
          if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
          }
        }
     
        return references.size();
      }
    

    通过以上的代码我们可以看出,其遍历了弱引用列表,链表中为空的引用,最后返回一个链表数量。如果返回的数量>0表示RealConnection活跃,如果<=0则表示RealConnection空闲。也就是用这个来方法来判断当前的链接是不是空闲的链接。

    我们再来看一下

    closeQuietly(longestIdleConnection.socket());是如何关闭空闲时间最长的链接的。
    public static void closeQuietly(Socket socket) {
        if (socket != null) {
          try {
            socket.close();
          } catch (AssertionError e) {
            if (!isAndroidGetsocknameError(e)) throw e;
          } catch (RuntimeException rethrown) {
            throw rethrown;
          } catch (Exception ignored) {
          }
        }
      }
    

    其实就一行核心代码socket.close()。socket的使用不再介绍,大家可以看专门类的文章。

    我们已经分析了从连接池清理空闲链接,到向连接池中加入新的链接。下面看看连接的使用以及连接的复用是如何实现的

    @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
      assert (Thread.holdsLock(this));
      for (RealConnection connection : connections) {
        if (connection.isEligible(address, route)) {
          streamAllocation.acquire(connection, true);
          return connection;
        }
      }
      return null;
    }
    

    获取连接池中的链接的逻辑非常的简单,利用for循环循环遍历连接池查看是否有符合要求的链接,如果有则直接返回该链接使用,如果没有就发挥null,然后会在另外的地方创建一个新的RealConnection放入连接池。这里的核心代码就是判断是否有符合条件的链接:connection.isEligible(address,route)

    public boolean isEligible(Address address, @Nullable Route route) {
        //如果当前链接的技术次数大于限制的大小,或者无法在此链接上创建流,则直接返回false
        if (allocations.size() >= allocationLimit || noNewStreams) return false;
        // 如果地址主机字段不一致直接返回false
        if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
        // 如果主机地址完全匹配我们就重用该连接
        if (address.url().host().equals(this.route().address().url().host())) {
          return true; // This connection is a perfect match.
        }
        return true;
      }
    

    OkHttp 中的复用机制

    前面提了 HTTP 中的复用机制,通过对 TCP 连接的复用,大幅提高了网络请求的效率。无论是 HTTP/1.1 中的 Keep-Alive 还是 HTTP/2 中的多路复用,都需要连接池来维护 TCP 连接,让我们看看 OkHttp 中连接池的实现。

    我们知道,在 findConnection 过程中,若无法从 transimitter 中获取到连接,则会尝试从连接池中获取连接。

    我们可以看到 RealConnectionPool.connections,它是一个 Deque,保存了所有的连接:

    private final Deque<RealConnection> connections = new ArrayDeque<>();
    

    连接清理机制

    同时会发现,在这个类中还存在着一个 executor,它的设置与 OkHttp 用于异步请求的线程池的设置几乎一样,它是用来做什么的呢?

    /**
     * Background threads are used to cleanup expired connections. There will be at most a single
     * thread running per connection pool. The thread pool executor permits the pool itself to be
     * garbage collected.
     */
    private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
            Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
            new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
    

    通过上面的注释可以看出,它是用来执行清理过期连接的任务的,并且最多每个连接池只会有一个线程在执行清理任务。这个清理的任务就是下面的 cleanupRunnable

    private final Runnable cleanupRunnable = () -> {
        while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
                long waitMillis = waitNanos / 1000000L;
                waitNanos -= (waitMillis * 1000000L);
                synchronized (RealConnectionPool.this) {
                    try {
                        RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
    };
    

    可以看到它是采用一个循环的方式调用 cleanup 方法进行清理,并从返回值中获取了需要 wait 的秒数,调用 wait 方法进入阻塞,也就是说每次清理的间隔由 cleanup 的返回值进行决定

    我们看到 cleanup 方法:

    /**
     * Performs maintenance on this pool, evicting the connection that has been idle the longest if
     * either it has exceeded the keep alive limit or the idle connections limit.
     *
     * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
     * -1 if no further cleanups are required.
     */
    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
        synchronized (this) {
            for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                RealConnection connection = i.next();
                // 统计连接被引用的transimitter的个数,若大于0则说明是正在使用的连接
                if (pruneAndGetAllocationCount(connection, now) > 0) {
                    inUseConnectionCount++;
                    continue;
                }
                // 否则是空闲连接
                idleConnectionCount++;
                // 找出空闲连接中空闲时间最长的连接
                long idleDurationNs = now - connection.idleAtNanos;
                if (idleDurationNs > longestIdleDurationNs) {
                    longestIdleDurationNs = idleDurationNs;
                    longestIdleConnection = connection;
                }
            }
            if (longestIdleDurationNs >= this.keepAliveDurationNs
                    || idleConnectionCount > this.maxIdleConnections) {
                // 如果发现空闲时间最久的连接所空闲时间超过了Keep-Alive设定的时间,或者是空闲连接数超过了最大空闲连接数
                // 将前面的其从队列中删除,并且在之后对其socket进行关闭
                connections.remove(longestIdleConnection);
            } else if (idleConnectionCount > 0) {
                // 返回离达到keep-alive设定的时间的距离,将在达到时执行进行清理
                return keepAliveDurationNs - longestIdleDurationNs;
            } else if (inUseConnectionCount > 0) {
                // 如果当前连接都是正在使用的,返回keep-alive所设定的时间
                return keepAliveDurationNs;
            } else {
                // 没有连接了,停止运行cleanup
                cleanupRunning = false;
                return -1;
            }
        }
        // 关闭空闲最久的连接,继续尝试清理
        closeQuietly(longestIdleConnection.socket());
        return 0;
    }
    

    可以看到,主要是下面几步:

    1. 调用 pruneAndGetAllocationCount 方法统计连接被引用的数量,大于 0 说明连接正在被使用
    2. 通过上面的方法统计空闲连接数及正在使用的连接数,并从中找出空闲最久的连接
    3. 若空闲最久的连接空闲的时间超过了所设定的 keepAliveDurationNs(这里不是指的 Keep-Alive 所设定时间),或者空闲连接数超过了所设定的 maxIdleConnections,清理该连接(移除并关闭socket),并返回 0 表示立即继续清理。
    4. 若还未超过,则返回下一次超过外部设定的 keepAliveDurationNs,表示等到下次超时的时候再进行清理
    5. 若当前连接都正处于使用中,返回所设定的 keepAliveDurationNs
    6. 若当前没有连接,则将 cleanupRunning 置为 false 停止清理

    在 OkHttp 中,将空闲连接的最长存活时间设定为了 5 分钟,并且将最大空闲连接数设置为了 5

    我们看看 pruneAndGetAllocationCount 是如何对连接被引用的数量进行统计的:

    /**
     * Prunes any leaked transmitters and then returns the number of remaining live transmitters on
     * {@code connection}. Transmitters are leaked if the connection is tracking them but the
     * application code has abandoned them. Leak detection is imprecise and relies on garbage
     * collection.
     */
    private int pruneAndGetAllocationCount(RealConnection connection, long now) {
        List<Reference<Transmitter>> references = connection.transmitters;
        for (int i = 0; i < references.size(); ) {
            Reference<Transmitter> reference = references.get(i);
            if (reference.get() != null) {
                i++;
                continue;
            }
            // We've discovered a leaked transmitter. This is an application bug.
            TransmitterReference transmitterRef = (TransmitterReference) reference;
            String message = "A connection to " + connection.route().address().url()
                    + " was leaked. Did you forget to close a response body?";
            Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
            references.remove(i);
            connection.noNewExchanges = true;
            // If this was the last allocation, the connection is eligible for immediate eviction.
            if (references.isEmpty()) {
                connection.idleAtNanos = now - keepAliveDurationNs;
                return 0;
            }
        }
        return references.size();
    }
    

    可以看到,connection 中是有维护一个引用它的 TransmitterReference 队列的,通过遍历并判断该 Transimitter 是否为 null 即可进行统计。这里的 Reference 所存的实际是一个继承自 WeakReferenceTransimitterReference 类:

    static final class TransmitterReference extends WeakReference<Transmitter> {
       // ...
    }
    

    可以发现,这种设计有点像 JVM 中的引用计数法 + 标记清除,实际上就是 OkHttp 仿照 JVM 的垃圾回收设计了这样一种类似引用计数法的方式来统计一个连接是否是空闲连接,同时采用标记清除法对空闲且不满足设定的规则的连接进行清除

    获取连接

    我们看到 connectionPool.transmitterAcquirePooledConnection 方法,了解一下连接池获取连接的过程:

    /**
     * Attempts to acquire a recycled connection to {@code address} for {@code transmitter}. Returns
     * true if a connection was acquired.
     *
     * <p>If {@code routes} is non-null these are the resolved routes (ie. IP addresses) for the
     * connection. This is used to coalesce related domains to the same HTTP/2 connection, such as
     * {@code square.com} and {@code square.ca}.
     */
    boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
                                               @Nullable List<Route> routes, boolean requireMultiplexed) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
            if (requireMultiplexed && !connection.isMultiplexed()) continue;
            if (!connection.isEligible(address, routes)) continue;
            transmitter.acquireConnectionNoEvents(connection);
            return true;
        }
        return false;
    }
    

    可以看到,首先注释中对我们传入不同的 routes 参数进行了解释,若 routes 不为 null 说明这是已解析过的路由,可以将其合并到同一个 HTTP/2 连接。

    而在 connection.isMultiplexed 的注释中说到,若该连接为 HTTP/2 连接,则会返回 true。

    connection.isEligible 注释中则说到,若该连接可以给对应的 address 分配 stream,则返回 true。

    在代码中,对 connections 进行了遍历:

    1. 当需要进行多路费用且当前的连接不是 HTTP/2 连接时,则放弃当前连接
    2. 当当前连接不能用于为 address 分配 stream,则放弃当前连接。
    3. 前两者都不满足,则获取该连接,并设置到 transimitter 中。

    三次获取连接的区别

    我们回顾一下 findConnection 中三次尝试从连接池获取连接的过程:

    • 第一次尝试:connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)
    • 第二次尝试(需要在进行了路由选择的情况下):connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, false)
    • 第三次尝试:connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)

    可以发现,其传入的参数是不同的。第一次由于是尝试从已经解析过的路由的连接池中获取连接,因此 route 设置为 null。

    第二次由于是在无法找到对应的连接,在进行了路由选择的条件下进行的,因此将 route 设置为了 null。

    而最后一次尝试从连接池获取连接之所以需要将 requireMultiplexed 设置为 true,因为这次只有可能是在多个请求并行进行的情况下才有可能发生,这种情况只有 HTTP/2 的连接才有可能发生。

    加入连接

    通过 RealConnectionPool.put 方法可以向连接池中加入连接:

    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
            cleanupRunning = true;
            executor.execute(cleanupRunnable);
        }
        connections.add(connection);
    }
    

    由于之前判断了如果连接池中没有连接,就会暂停连接清理线程,所以这里如果放入了新的连接,就会判断连接清理线程是否正在执行,若已停止执行则将其继续执行。之后将该连接放入了 Deque 中。

    通知连接空闲

    每当外部调用了 Transimitter.releaseConnectionNoEvents 方法时,最后都会调用到 RealConnection.connectionBecameIdle 方法来通知连接池连接进入了空闲状态:

    /**
     * Notify this pool that {@code connection} has become idle. Returns true if the connection has
     * been removed from the pool and should be closed.
     */
    boolean connectionBecameIdle(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (connection.noNewExchanges || maxIdleConnections == 0) {
            connections.remove(connection);
            return true;
        } else {
            notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
            return false;
        }
    }
    

    此时如果该连接不支持用于创建新 Exchange,或不允许有空闲连接,则会直接将该连接移除,否则会通过 notifyAll 方法唤醒阻塞的清理线程,尝试对空闲连接进行清理,这样能保证每当有空闲连接时最及时地对连接池进行清理。

    连接的建立

    我们知道,在寻找连接的过程中,若从 Transimitter 及连接池中都无法获取到连接时,就会创建一个新的连接,让我们看看这个创建连接的过程是怎样的:

    在寻找连接的代码中,创建连接的核心代码如下:

    // ...
    result = new RealConnection(connectionPool, selectedRoute);
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                    connectionRetryEnabled, call, eventListener);
    

    我们先看到 RealConnection 的构造函数:

    public RealConnection(RealConnectionPool connectionPool, Route route) {
        this.connectionPool = connectionPool;
        this.route = route;
    }
    

    只是进行了简单的赋值,我们接着看到 RealConnection.connect 方法:

    public void connect(int connectTimeout, int readTimeout, int writeTimeout,
                        int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
                        EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");
        RouteException routeException = null;
        List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
        ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
        // ...一些错误处理
        while (true) {
            try {
                if (route.requiresTunnel()) {
                    // 如果使用了隧道技术,调用connectTunnel方法
                    connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
                    if (rawSocket == null) {
                        // We were unable to connect the tunnel but properly closed down our resources.
                        break;
                    }
                } else {
                    // 未使用隧道技术,调用connectSocket方法
                    connectSocket(connectTimeout, readTimeout, call, eventListener);
                }
                // 建立协议
                establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
                eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
                break;
            } catch (IOException e) {
                //... 异常下的资源释放
            }
        }
        // ... 一些错误处理
    }
    

    可以看到,这里是一个循环,不断尝试建立连接,其中核心步骤如下:

    1. 若使用了隧道技术,调用 connectTunnel 方法
    2. 若未使用隧道技术,调用 connectSocket 方法
    3. 调用 establishProtocol 方法建立协议

    让我们看看三个方法分别是如何实现的。

    直接连接

    我们先看看直接连接是如何实现的,我们看到 connectSocket 方法:

    /**
     * Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
     */
    private void connectSocket(int connectTimeout, int readTimeout, Call call,
                               EventListener eventListener) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
        // 初始化rawSocket,其中对SOCKS代理采用了SOCKS代理服务器
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
                ? address.socketFactory().createSocket()
                : new Socket(proxy);
        eventListener.connectStart(call, route.socketAddress(), proxy);
        rawSocket.setSoTimeout(readTimeout);
        try {
            // 调用connectSocket方法对Socket进行连接,这里预置了不同平台的实现
            Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
            ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
            ce.initCause(e);
            throw ce;
        }
        // 获取source及sink,用于读取及写入
        try {
            source = Okio.buffer(Okio.source(rawSocket));
            sink = Okio.buffer(Okio.sink(rawSocket));
        } catch (NullPointerException npe) {
            if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
                throw new IOException(npe);
            }
        }
    }
    

    可以看到,这里主要是进行 Socket 的连接,首先根据代理类型创建了 Socket,之后调用了 connectSocket 方法进行连接(里面调用的其实仍然是 socket.connect 方法)。最后调用 Okio 的方法获取 sourcesink

    这个过程还是比较简单的,和正常使用 Socket 的流程大致相同:创建Socket=>连接=>获取 stream,其中在 connectSocket 时根据不同平台做了不同的处理。

    通过隧道连接

    首先我们要理解一下什么是隧道。这个其实是计网中的知识,之前在 《计算机网络——自顶向下方法》中看到过,不过书中没有详细介绍,这里刚好学习一下。

    隧道技术的出现主要是为了适配 IPv4 到 IPv6 的转变。通过这种隧道技术,可以通过一种网络协议来传输另外一种网络协议的数据,比如 A 主机与 B 主机都是采用 IPv6,而连接 A 与 B 的是 IPv4 的网络,为了实现 A 与 B 的通信,可以使用隧道技术,数据包经过 IPv4 的多协议路由时将 IPv6 的数据包放入 IPv4 的数据包中,传递给 B。当到达 B 的路由器时,数据又被剥离之后传递给 B。这样在 A 与 B 看来,它们使用的都是 IPv6 与对方通信。如下图所示:


    image.png

    那么怎么打开隧道呢?

    HTTP 提供了一个特殊的 method—— CONNECT,它是 HTTP/1.1 协议中预留的方法,可以通过它将连接改为隧道的代理服务器。客户端发送一个 CONNECT 请求给隧道网关请求打开一条 TCP 连接,当隧道打通之后,客户端通过 HTTP 隧道发送的所有数据会转发给 TCP 连接,服务器响应的所有数据会通过隧道发给客户端。

    而在 OkHttp 中,对隧道的支持主要是为了支持 SSL 隧道——SSL 隧道的初衷是为了通过防火墙来传输加密的 SSL 数据,此时隧道的作用就是将非 HTTP 的流量(SSL流量)传过防火墙到达指定的服务器(比如 HTTPS)。

    接着我们看到 connectTunnel 方法的实现:

    /**
     * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
     * proxy server can issue an auth challenge and then close the connection.
     */
    private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
                               EventListener eventListener) throws IOException {
        // 创建隧道Request
        Request tunnelRequest = createTunnelRequest();
        HttpUrl url = tunnelRequest.url();
        for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
            // 通过connectSocket建立Socket
            connectSocket(connectTimeout, readTimeout, call, eventListener);
            // 创建隧道
            tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
            // 当创建的隧道为null时,说明隧道成功建立,break
            if (tunnelRequest == null) break; 
            // 回收资源
            closeQuietly(rawSocket);
            rawSocket = null;
            sink = null;
            source = null;
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
        }
    }
    

    这里首先构建了一个隧道的 tunnelRequest。之后进行了循环,不断尝试建立隧道,不过 OkHttp 限制了其最大尝试次数为 21 次。

    建立隧道的过程首先通过 connectSocket 方法建立了 Socket 连接,然后通过 createTunnel 方法建立隧道。

    我们看看 createTunnelRequest 方法做了什么:

    private Request createTunnelRequest() throws IOException {
        Request proxyConnectRequest = new Request.Builder()
                .url(route.address().url())
                .method("CONNECT", null)
                .header("Host", Util.hostHeader(route.address().url(), true))
                .header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
                .header("User-Agent", Version.userAgent())
                .build();
        Response fakeAuthChallengeResponse = new Response.Builder()
                .request(proxyConnectRequest)
                .protocol(Protocol.HTTP_1_1)
                .code(HttpURLConnection.HTTP_PROXY_AUTH)
                .message("Preemptive Authenticate")
                .body(Util.EMPTY_RESPONSE)
                .sentRequestAtMillis(-1L)
                .receivedResponseAtMillis(-1L)
                .header("Proxy-Authenticate", "OkHttp-Preemptive")
                .build();
        Request authenticatedRequest = route.address().proxyAuthenticator()
                .authenticate(route, fakeAuthChallengeResponse);
        return authenticatedRequest != null
                ? authenticatedRequest
                : proxyConnectRequest;
    }
    

    可以看到,这里构建了一个 method 为 CONENCT 的请求。

    我们接着看看 createTunnel 方法又做了什么事情:

    /**
     * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
     * the proxy connection. This may need to be retried if the proxy requires authorization.
     */
    private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
                                 HttpUrl url) throws IOException {
        // 构造HTTP/1.1请求
        String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
        while (true) {
            Http1ExchangeCodec tunnelCodec = new Http1ExchangeCodec(null, null, source, sink);
            source.timeout().timeout(readTimeout, MILLISECONDS);
            sink.timeout().timeout(writeTimeout, MILLISECONDS);
            tunnelCodec.writeRequest(tunnelRequest.headers(), requestLine);
            tunnelCodec.finishRequest();
            // 发出隧道请求
            Response response = tunnelCodec.readResponseHeaders(false)
                    .request(tunnelRequest)
                    .build();
            tunnelCodec.skipConnectBody(response);
            switch (response.code()) {
                case HTTP_OK:
                    // 返回200说明成功建立隧道,返回null
                    if (!source.getBuffer().exhausted() || !sink.buffer().exhausted()) {
                        throw new IOException("TLS tunnel buffered too many bytes!");
                    }
                    return null;
                case HTTP_PROXY_AUTH:    // 表示服务端要进行代理认证
                    // 进行代理认证
                    tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
                    // 代理认证不通过
                    if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");
                    // 代理认证通过,但需要关闭TCP连接
                    if ("close".equalsIgnoreCase(response.header("Connection"))) {
                        return tunnelRequest;
                    }
                    break;
                default:
                    throw new IOException(
                            "Unexpected response code for CONNECT: " + response.code());
            }
        }
    }
    

    可以看到,这里主要进行如下的工作:

    1. 拼接 HTTP/1.1 请求
    2. 发出隧道请求,读取响应
    3. 若隧道请求返回 200,说明隧道建立成功,返回 null
    4. 若隧道返回 407,说明服务器需要进行代理认证,调用对应方法进行代理认证

    隧道打通之后,就可以通过隧道进行网络请求了。

    发布协议

    经过前面的步骤,我们建立了一条与服务端的 Socket 通道,我们接着看到 establishProtocol 方法:

    private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
                                   int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
        // 如果不是https地址
        if (route.address().sslSocketFactory() == null) {
            // 如果协议中包含了 http2 with prior knowledge
            if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
                socket = rawSocket;
                protocol = Protocol.H2_PRIOR_KNOWLEDGE;
                startHttp2(pingIntervalMillis);
                return;
            }
            // 协议为 HTTP/1.1
            socket = rawSocket;
            protocol = Protocol.HTTP_1_1;
            return;
        }
        eventListener.secureConnectStart(call);
        // TLS握手
        connectTls(connectionSpecSelector);
        eventListener.secureConnectEnd(call, handshake);
        if (protocol == Protocol.HTTP_2) {
            // 如果是HTTP2协议,调用 startHttp2 方法
            startHttp2(pingIntervalMillis);
        }
    }
    

    可以看到,这个方法主要是在建立了 Socket 连接的基础上,对各个协议进行支持。

    首先判断了当前地址是否是 HTTPS 地址。

    不是 HTTPS 的情况下,若协议中包含了 H2_PRIOR_KNOWLEDGE 则采用 HTTP/2 进行请求,调用 startHttp2 方法,否则采用 HTTP/1.1。

    是 HTTPS 的情况下,首先调用了 connectTls 方法进行 TLS 握手,之后若是 HTTP/2 协议,则调用 startHttp2 方法。

    启动 HTTP/2 连接

    让我们先看看 startHttp2 方法究竟是做了什么:

    private void startHttp2(int pingIntervalMillis) throws IOException {
        socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
        http2Connection = new Http2Connection.Builder(true)
                .socket(socket, route.address().url().host(), source, sink)
                .listener(this)
                .pingIntervalMillis(pingIntervalMillis)
                .build();
        http2Connection.start();
    }
    

    这里主要是构建了一个 HTTP/2 的 Http2Connection,并且将 listener 设置为了该 RealConnection,之后通过 http2Connection.start 方法启动了 HTTP/2 连接。

    /**
     * @param sendConnectionPreface true to send connection preface frames. This should always be true
     *                              except for in tests that don't check for a connection preface.
     */
    void start(boolean sendConnectionPreface) throws
            IOException {
        if (sendConnectionPreface) {
            writer.connectionPreface();
            writer.settings(okHttpSettings);
            int windowSize = okHttpSettings.getInitialWindowSize();
            if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
                writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
            }
        }
        new Thread(readerRunnable).start(); // Not a daemon thread.
    }
    

    这里 sendConnectionPreface 默认为 true,它首先调用了 writer.connectionPreface 方法,之后调用了 writer.settings 方法。最后,启用了一个 readerRunnable 的读取线程。

    在 HTTP/2 中,每个终端都需要发送一个连接 preface 作为在使用的协议的一个最终的确认,并为 HTTP/2 连接建立初始的设定。客户端和服务器相互发送一个不同的连接 preface。

    连接 preface 以字符串 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n 开始,这个序列后面必须跟着一个 SETTINGS 帧。因此,在之后又调用了 writer.settings 方法,写入 SETTINGS 帧。

    我们先看到 connectionPreface 方法:

    public synchronized void connectionPreface() throws IOException {
        if (closed) throw new IOException("closed");
        if (!client) return; // Nothing to write; servers don't send connection headers!
        if (logger.isLoggable(FINE)) {
            logger.fine(format(">> CONNECTION %s", CONNECTION_PREFACE.hex()));
        }
        sink.write(CONNECTION_PREFACE.toByteArray());
        sink.flush();
    }
    

    这里实际上是向 HTTP/2 连接的 Socket 中写入了 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n 这一字符串。之后我们看到 writer.settings 方法:

    /**
     * Write okhttp's settings to the peer.
     */
    public synchronized void settings(Settings settings) throws IOException {
        if (closed) throw new IOException("closed");
        int length = settings.size() * 6;
        byte type = TYPE_SETTINGS;
        byte flags = FLAG_NONE;
        int streamId = 0;
        frameHeader(streamId, length, type, flags);
        for (int i = 0; i < Settings.COUNT; i++) {
            if (!settings.isSet(i)) continue;
            int id = i;
            if (id == 4) {
                id = 3; // SETTINGS_MAX_CONCURRENT_STREAMS renumbered.
            } else if (id == 7) {
                id = 4; // SETTINGS_INITIAL_WINDOW_SIZE renumbered.
            }
            sink.writeShort(id);
            sink.writeInt(settings.get(i));
        }
        sink.flush();
    }
    

    这里主要是写入了一些配置的数据,其中调用了 frameHeader 写入了帧头。

    最后我们看到 readerRunnable.execute

    @Override
    protected void execute() {
        ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
        ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
        IOException errorException = null;
        try {
            reader.readConnectionPreface(this);
            while (reader.nextFrame(false, this)) {
            }
            connectionErrorCode = ErrorCode.NO_ERROR;
            streamErrorCode = ErrorCode.CANCEL;
        } catch (IOException e) {
            errorException = e;
            connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
            streamErrorCode = ErrorCode.PROTOCOL_ERROR;
        } finally {
            close(connectionErrorCode, streamErrorCode, errorException);
            Util.closeQuietly(reader);
        }
    }
    

    可以看到,这里主要是调用了 reader.readConnectionPreface 方法读取服务端发送来的 preface,并判断是否为对应字符串,从而完成 HTTP/2 连接的启动。

    TLS 握手

    接着我们看到 TLS 握手的过程,让我们看看 connectTls 方法:

    private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
        Address address = route.address();
        SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
        boolean success = false;
        SSLSocket sslSocket = null;
        try {
            // 基于之前建立的Socket建立一个包装对象SSLSocket
            sslSocket = (SSLSocket) sslSocketFactory.createSocket(
                    rawSocket, address.url().host(), address.url().port(), true /* autoClose */);
            // 对TLS相关信息进行配置
            ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
            if (connectionSpec.supportsTlsExtensions()) {
                Platform.get().configureTlsExtensions(
                        sslSocket, address.url().host(), address.protocols());
            }
            // 进行握手
            sslSocket.startHandshake();
            // 获取SSLSession
            SSLSession sslSocketSession = sslSocket.getSession();
            Handshake unverifiedHandshake = Handshake.get(sslSocketSession);
            // 验证证书对该主机是否有效
            if (!address.hostnameVerifier().verify(address.url().host(), sslSocketSession)) {
                List<Certificate> peerCertificates = unverifiedHandshake.peerCertificates();
                if (!peerCertificates.isEmpty()) {
                    X509Certificate cert = (X509Certificate) peerCertificates.get(0);
                    throw new SSLPeerUnverifiedException(
                            "Hostname " + address.url().host() + " not verified:"
                                    + "\n    certificate: " + CertificatePinner.pin(cert)
                                    + "\n    DN: " + cert.getSubjectDN().getName()
                                    + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
                } else {
                    throw new SSLPeerUnverifiedException(
                            "Hostname " + address.url().host() + " not verified (no certificates)");
                }
            }
            address.certificatePinner().check(address.url().host(),
                    unverifiedHandshake.peerCertificates());
            String maybeProtocol = connectionSpec.supportsTlsExtensions()
                    ? Platform.get().getSelectedProtocol(sslSocket)
                    : null;
            socket = sslSocket;
            // 获取source及sink
            source = Okio.buffer(Okio.source(socket));
            sink = Okio.buffer(Okio.sink(socket));
            handshake = unverifiedHandshake;
            protocol = maybeProtocol != null
                    ? Protocol.get(maybeProtocol)
                    : Protocol.HTTP_1_1;
            success = true;
        } catch (AssertionError e) {
            if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
            throw e;
        } finally {
            if (sslSocket != null) {
                Platform.get().afterHandshake(sslSocket);
            }
            if (!success) {
                closeQuietly(sslSocket);
            }
        }
    }
    

    可以看到,这里的步骤主要是下列步骤:

    1. 基于之前建立的 Socket 建立包装类 SSLSocket
    2. 对 TLS 相关信息进行配置
    3. 通过 SSLSocket 进行握手
    4. 验证一些证书相关信息
    5. 获取 sourcesink

    总结

    学习资料整理:Android架构师进阶之路

    连接池小结:

    1.创建一个连接池

    创建连接池非常简单只需要使用new关键字创建一个对象向就行了。new ConnectionPool(maxIdleConnections,keepAliveDuration,timeUnit)

    2.向连接池中添加一个连接

    a.通过ConnectionPool的put(realConnection)方法加入链接,在加入链接之前会先调用线程池执行cleanupRunnable匿名内部类来清理空闲的链接,然后再把链接加入Deque队列中,

    b.在cleanupRunnable匿名内部类中执行死循环不停的调用cleanup来清理空闲的连接,并返回一个下次清理的时间间隔,调用ConnectionPool.wait方法根据下次清理的时间间隔

    c.在cleanup的内部会遍历connections连接池队列,移除空闲时间最长的连接并返回下次清理的时间。

    d.判断连接是否空闲是利用RealConnection内部的List<Reference<StreamAllocation> 的size。如果size>0就说明不空闲,如果size<=0就说明空闲。

    3.获取一个链接

    通过ConnectionPool的get方法来获取符合要求的RealConnection。如果有服务要求的就返回RealConnection,并用该链接发起请求,如果没有符合要求的就返回null,并在外部重新创建一个RealConnection,然后再发起链接。判断条件:1.如果当前链接的技术次数大于限制的大小,或者无法在此链接上创建流,则直接返回false 2.如果地址主机字段不一致直接返回false3.如果主机地址完全匹配我们就重用该连接

    复用机制小结

    OkHttp 中采用了连接池机制实现了连接的复用,避免了每次都创建新的连接从而导致资源的浪费。获取连接的过程主要如下:

    1. 尝试在 transimitter 中寻找已经分配的连接
    2. transimitter 中获取不到,尝试从连接池中获取连接
    3. 连接池中仍然获取不到,尝试进行一次路由选择,再次从连接池中获取连接
    4. 连接池中仍然找不到需要的连接,则创建一个新的连接
    5. 由于 HTTP/2 下采用了连接的多路复用机制,所以连接可以并行进行,因此再次尝试从连接池中获取连接,获取到则丢弃创建的连接
    6. 若连接池中仍获取不到连接,则将刚刚创建的连接放入连接池

    其中,在连接池中采用了一个清理线程对超过了设定参数的空闲连接进行清理,每次清理后会计算下一次需要清理的时间并进入阻塞,每当有新连接进入或连接进入空闲时会重新唤醒该清理线程。

    对于每个连接,都采用了一种类似 GC 中的引用计数法的形式,每个 RealConnection 都持有了使用它的 Transimitter 的弱引用,通过判断持有的弱引用个数从而判断该连接是否空闲。

    OkHttp 默认将最大存活空闲连接个数设置为了 5,且每个连接空闲时间不能超过 5 分钟,否则将被清理线程所回收

    而在连接建立过程中,首先会判断该连接是否需要 SSL 隧道,若不需要则直接建立了 Socket 并获取了其 sourcesink,若需要则会先尝试建立 SSL 隧道,最后再进行 Socket 连接。

    Socket 连接建立成功后,会通过 establishProtocol 方法对每个协议进行不同的处理,从而对各个协议进行支持(如对 HTTPS 的支持)

    相关文章

      网友评论

        本文标题:Android架构师学好OKhttp网络框架——Socket连接

        本文链接:https://www.haomeiwen.com/subject/wjovbrtx.html