美文网首页
OkHttp使用及源码学习

OkHttp使用及源码学习

作者: shuixingge | 来源:发表于2016-05-20 16:06 被阅读2690次

    本文仅为学习笔记;不是原创文章

    使用教程
    源码分析参考1
    源码分析参考2

    一:OkHttp连接池复用

    1.1 持久连接

    HTTP非持久连接 HTTP持久连接
    持久连接(HTTP keep-alive):允许HTTP设备在事务处理结束之后将TCP连接保持在打开状态,以便未来的HTTP请求重用现在的连接;在事务处理结束之后仍然保持在打开状态的TCP连接叫做持久连接;非持久连接会在事件处理结束之后关闭,持久连接会在不同的事务之间保持打开状态;
    持久连接的优点:降低时延和连接建立的开销;将连接保持在已经调谐的状态;减少了打开连接的潜在数量;
    持久连接的缺点:如果存在大量空闲的keepalive connections(我们可以称作僵尸连接或者泄漏连接),其它客户端们的正常连接速度也会受到影响

    1.2 连接池的使用与分析

    Call: 对HTTP的Request的封装
    Connection: 对socket连接的包装;
    **StreamAllocation: **表示Connection被引用的次数
    **ConnectionPool: ** Socket连接池,对连接缓存进行回收与管理
    **Deque: ** Deque也就是双端队列,双端队列同时具有队列和栈性质;

    1.3 Connection自动回收机制

    ConnectionPool.java
    ConnectionPool内部用一个线程池来执行连接池的自动回收和管理任务,

     private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
      /** The maximum number of idle connections for each address. */
      private final int maxIdleConnections;//每个connection的空闲socket连接数目;
      private final long keepAliveDurationNs;//每个空闲socket连接的keep-alive时长;
    

    ConnectionPool()
    keepAliveDurationNs默认为5分钟;maxIdleConnections默认为5个空闲连接;

      public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
      }
     public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
        this.maxIdleConnections = maxIdleConnections;
        this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
    
        // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
        if (keepAliveDuration <= 0) {
          throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
        }
      }
    

    当用户socket连接成功,向连接池中put新的socket时,回收函数会被主动调用,线程池就会执行cleanupRunnable;
    ConnectionPool.put()

    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    

    cleanupRunnable.java
    通过一个无限循环来执行cleanup()方法来执行connection的连接自动回收,并返回下一次回收的时间;

    //Socket清理的Runnable,每当put操作时,就会被调用
    //put操作是在网络线程
    //Socket清理是在 ConnectionPool线程池中调用
    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    

    ConnectionPool.Cleanup()

    /**
       * Performs maintenance on this pool, evicting the connection that has been idle the longest if
       * either it has exceeded the keep alive limit or the idle connections limit.
       *
       * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
       * -1 if no further cleanups are required.
       */
    //清理超过空闲连接次数和空闲时间限制的连接,返回下次执行清理需要等待时长;如果不需要清理,返回-1;
      long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
       //遍历Deque中所有的RealConnection,标记泄漏的连接
        synchronized (this) {
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
    
            // If the connection is in use, keep searching.
           //如果连接在使用,继续搜索需要清理的connection;
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
           //空闲连接数目+1;
            idleConnectionCount++;
    
            // If the connection is ready to be evicted, we're done.
            //连接已经空闲的时间
            //找到空闲时间最长的连接;
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
          //如果最长空闲时间的连接所包含的socket空闲连接超过最大空闲连接限制或者超过最长空闲时间;那么此连接为待清理的连接
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            // We've found a connection to evict. Remove it from the list, then close it below (outside
            // of the synchronized block).
            //清理连接
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            //返回剩余可空闲的时间
            // A connection will be ready to evict soon.
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
           //返回 keepAlive时长;
            return keepAliveDurationNs;
          } else {
            // No connections, idle or in use.
            //无连接;
            cleanupRunning = false;
            return -1;
          }
        }
    
        closeQuietly(longestIdleConnection.socket());
    
        // Cleanup again immediately.
        return 0;
      }
    

    ConnectionPool.pruneAndGetAllocationCount()
    主要用来判断一个连接是不是活跃的连接

    /**
       * Prunes any leaked allocations and then returns the number of remaining live allocations on
       * {@code connection}. Allocations are leaked if the connection is tracking them but the
       * application code has abandoned them. Leak detection is imprecise and relies on garbage
       * collection.
       */
     返回一个connection剩余的活跃的 allocation数量;
      private int pruneAndGetAllocationCount(RealConnection connection, long now) {
        List<Reference<StreamAllocation>> references = connection.allocations;
        for (int i = 0; i < references.size(); ) {
          Reference<StreamAllocation> reference = references.get(i);
    
          if (reference.get() != null) {
            i++;
            continue;
          }
    
          // We've discovered a leaked allocation. This is an application bug.
          Internal.logger.warning("A connection to " + connection.route().address().url()
              + " was leaked. Did you forget to close a response body?");
        //移除一个Allocation;
          references.remove(i);
          connection.noNewStreams = true;
    
          // If this was the last allocation, the connection is eligible for immediate eviction.
       //如果这个 connection所有的allocation都没有被引用,那么这个连接应该马上被清理,设置该connection的已经空闲 了keepAliveDurationNs时间;
          if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
          }
        }
    
        return references.size();
      }
    }
    

    1:遍历RealConnection连接中的StreamAllocationList,它维护着一个弱应用列表
    2 :查看此StreamAllocation是否为空(它是在线程池的put/remove手动控制的),如果为空,说明已经没有代码引用这个对象了,需要在List中删除
    3 :遍历结束,如果List中维护的StreamAllocation删空了,就返回0,表示这个连接已经没有代码引用了,是泄漏的连接;否则返回非0的值,表示这个仍然被引用,是活跃的连接。

    二:OkHttp缓存策略

    OkHttp用CacheStrategy很好的实现了符合HTTP规范的HTTP缓存策略;

    HTTP缓存流程

    CacheStrategy()构造
    networkRequest:网络请求;
    cacheResponse:缓存响应;

     /** The request to send on the network, or null if this call doesn't use the network. */
      public final Request networkRequest;
       
      /** The cached response to return or validate; or null if this call doesn't use a cache. */
      public final Response cacheResponse;
    
      private CacheStrategy(Request networkRequest, Response cacheResponse) {
        this.networkRequest = networkRequest;
        this.cacheResponse = cacheResponse;
      }
    

    isCacheable():根据返回的状态码,主要用来判断一个Reponse是否可以缓存;如果不能缓存,那么Request就需要走网络请求;不支持缓存部分内容;如果是302响应(暂时性重定向,需要进一步判断?);如果有Reponse和Request都有noStore(),那么代表不能缓存;

    /** Returns true if {@code response} can be stored to later serve another request. */
      public static boolean isCacheable(Response response, Request request) {
        // Always go to network for uncacheable response codes (RFC 7231 section 6.1),
        // This implementation doesn't support caching partial content.
        switch (response.code()) {
          case HTTP_OK:
          case HTTP_NOT_AUTHORITATIVE:
          case HTTP_NO_CONTENT:
          case HTTP_MULT_CHOICE:
          case HTTP_MOVED_PERM:
          case HTTP_NOT_FOUND:
          case HTTP_BAD_METHOD:
          case HTTP_GONE:
          case HTTP_REQ_TOO_LONG:
          case HTTP_NOT_IMPLEMENTED:
          case StatusLine.HTTP_PERM_REDIRECT:
            // These codes can be cached unless headers forbid it.
            break;
    
          case HTTP_MOVED_TEMP:
          case StatusLine.HTTP_TEMP_REDIRECT:
            // These codes can only be cached with the right response headers.
            // http://tools.ietf.org/html/rfc7234#section-3
            // s-maxage is not checked because OkHttp is a private cache that should ignore s-maxage.
            if (response.header("Expires") != null
                || response.cacheControl().maxAgeSeconds() != -1
                || response.cacheControl().isPublic()
                || response.cacheControl().isPrivate()) {
              break;
            }
            // Fall-through.
    
          default:
            // All other codes cannot be cached.
            return false;
        }
        // A 'no-store' directive on request or response prevents the response from being cached.
    
          return !response.cacheControl().noStore() && !request.cacheControl().noStore();
    

    Factory.java:是CacheStrage的一个内部类;

        final long nowMillis;
        final Request request;
        final Response cacheResponse;
    
        /** The server's time when the cached response was served, if known. **     
       //服务器创建响应的时间
        private Date servedDate;
        private String servedDateString;
       //缓存文档最后修改时间
        /** The last modified date of the cached response, if known. */
        private Date lastModified;
        private String lastModifiedString;
       
        /**
         * The expiration date of the cached response, if known. If both this field and the max age are
         * set, the max age is preferred.
         */
        //缓存文档过期时间
        private Date expires;
    
        /**
         * Extension header set by OkHttp specifying the timestamp when the cached HTTP request was
         * first initiated.
         */
       // 第一次发送请求的时间戳
        private long sentRequestMillis;
      //第一次接收到缓存响应的时间戳
        /**
         * Extension header set by OkHttp specifying the timestamp when the cached HTTP response was
         * first received.
         */
        private long receivedResponseMillis;
       //实体标签
        /** Etag of the cached response. */
        private String etag;
        //缓存响应的年龄
        /** Age of the cached response. */
        private int ageSeconds = -1;
    
    

    Factory构造函数:根据缓存响应来初始化各个参数值;

    public Factory(long nowMillis, Request request, Response cacheResponse) {
          //当前时间
          this.nowMillis = nowMillis;
          this.request = request;
          this.cacheResponse = cacheResponse;
    
          if (cacheResponse != null) {
               //请求发送时间;初始发送
            this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
            //响应产生时间
            this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
            Headers headers = cacheResponse.headers();
            for (int i = 0, size = headers.size(); i < size; i++) {
              String fieldName = headers.name(i);
              String value = headers.value(i);
              if ("Date".equalsIgnoreCase(fieldName)) {
                servedDate = HttpDate.parse(value);
                servedDateString = value;
              } else if ("Expires".equalsIgnoreCase(fieldName)) {
                expires = HttpDate.parse(value);
              } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
                lastModified = HttpDate.parse(value);
                lastModifiedString = value;
              } else if ("ETag".equalsIgnoreCase(fieldName)) {
                etag = value;
              } else if ("Age".equalsIgnoreCase(fieldName)) {
                //缓存响应的年龄
                ageSeconds = HeaderParser.parseSeconds(value, -1);
              }
            }
          }
        }
    

    Factory.getCandidate():返回一个CacheStragey();

     /** Returns a strategy to use assuming the request can use the network. */
        private CacheStrategy getCandidate() {
          // No cached response.
           //响应为空, 走网络;
          if (cacheResponse == null) {
           
            return new CacheStrategy(request, null);
          }
    
          // Drop the cached response if it's missing a required handshake.
            //是HTTPS请求且TLS握手失败,走网络;
          if (request.isHttps() && cacheResponse.handshake() == null) {
            return new CacheStrategy(request, null);
          }
    
          // If this response shouldn't have been stored, it should never be used
          // as a response source. This check should be redundant as long as the
          // persistence store is well-behaved and the rules are constant.
            //如果不能缓存,走网络;
          if (!isCacheable(cacheResponse, request)) {
            return new CacheStrategy(request, null);
          }
           //请求不允许缓存,或者是条件请求,走网络
          CacheControl requestCaching = request.cacheControl();
          if (requestCaching.noCache() || hasConditions(request)) {
            return new CacheStrategy(request, null);
          }
          //初始化缓存响应的年龄和缓存新鲜时间
          long ageMillis = cacheResponseAge();
          long freshMillis = computeFreshnessLifetime();
          //如果
          if (requestCaching.maxAgeSeconds() != -1) {
            freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
          }
          //如果
          long minFreshMillis = 0;
          if (requestCaching.minFreshSeconds() != -1) {
            minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
          }
    
          long maxStaleMillis = 0;
          CacheControl responseCaching = cacheResponse.cacheControl();
          if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
            maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
          }
    
          if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
            Response.Builder builder = cacheResponse.newBuilder();
            if (ageMillis + minFreshMillis >= freshMillis) {
              //缓存不新鲜;添加相关响应首部;
              builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
            }
            long oneDayMillis = 24 * 60 * 60 * 1000L;
              //缓存过期;添加相关响应首部;
            if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
              builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
            }
            return new CacheStrategy(null, builder.build());
          }
         //分别构造If-None-Match;If-Modified-Since请求首部;
          Request.Builder conditionalRequestBuilder = request.newBuilder();
    
          if (etag != null) {
            conditionalRequestBuilder.header("If-None-Match", etag);
          } else if (lastModified != null) {
            conditionalRequestBuilder.header("If-Modified-Since", lastModifiedString);
          } else if (servedDate != null) {
            conditionalRequestBuilder.header("If-Modified-Since", servedDateString);
          }
    
          Request conditionalRequest = conditionalRequestBuilder.build();
         //如果允许条件请求,则进行条件请求,验证新鲜度;不允许就发起新的网络请求;
          return hasConditions(conditionalRequest)
              ? new CacheStrategy(conditionalRequest, cacheResponse)
              : new CacheStrategy(conditionalRequest, null);
        }
    

    Factory.computeFreshnessLifetime():计算缓存维持在新鲜(不过期)状态还有多长

    /**  Returns the number of milliseconds that the response was 
    fresh for, starting from the served date. **/
    private long computeFreshnessLifetime() {
          CacheControl responseCaching = cacheResponse.cacheControl();
          if (responseCaching.maxAgeSeconds() != -1) {
             //返回reponse的 max-age
            return SECONDS.toMillis(responseCaching.maxAgeSeconds());
          } else if (expires != null) {
           //返回expires-servedDate;取差值;消除服务器时钟偏差;
            long servedMillis = servedDate != null
                ? servedDate.getTime()
                : receivedResponseMillis;
            long delta = expires.getTime() - servedMillis;
            return delta > 0 ? delta : 0;
          } else if (lastModified != null
              && cacheResponse.request().url().query() == null) {
            // As recommended by the HTTP RFC and implemented in Firefox, the
            // max age of a document should be defaulted to 10% of the
            // document's age at the time it was served. Default expiration
            // dates aren't used for URIs containing a query.
            long servedMillis = servedDate != null
                ? servedDate.getTime()
                : sentRequestMillis;
            //返回expires-lastModified;作为缓存能维持在新鲜状态的时长;取差值;消除服务器时钟偏差;
            long delta = servedMillis - lastModified.getTime();
            return delta > 0 ? (delta / 10) : 0;
          }
          return 0;
        }
    

    Factory:cacheResponseAge():返回 response的年龄;

    /**  Returns the current age of the response, in milliseconds.
     The calculation is specified by RFC 2616, 13.2.3 Age Calculations. **/
     private long cacheResponseAge() {
          //客户端初始接收某响应时间-服务器响应产生时间;
          long apparentReceivedAge = servedDate != null
              ? Math.max(0, receivedResponseMillis - servedDate.getTime())
              : 0;
          long receivedAge = ageSeconds != -1
              ? Math.max(apparentReceivedAge, SECONDS.toMillis(ageSeconds))
              : apparentReceivedAge;
          //客户端初始接收某响应时间-客户端初始发送某请求时间
          long responseDuration = receivedResponseMillis - sentRequestMillis;
        //当前时间-客户端初始接收某响应时间
          long residentDuration = nowMillis - receivedResponseMillis;
         当前时间-第一次发送请求时间+
          return receivedAge + responseDuration + residentDuration;
        }
    

    okhttp缓存实现

    LinkedHashMap;
    文件
    OkHttp通过对文件进行了多次封装,实现了非常简单的I/O操作
    OkHttp通过对请求url进行md5实现了与文件的映射,实现写入,删除等操作
    OkHttp内部维护着清理线程池,实现对缓存文件的自动清理

    okhttp任务调度

    Dispacher

    okhttp 任务调度
       // 最大并发请求数为64
       private int maxRequests = 64;
      // 每个主机最大请求数为5
       private int maxRequestsPerHost = 5;
       //线程池
      /** Executes calls. Created lazily. */
      private ExecutorService executorService;
     //缓存队列
      /** Ready async calls in the order they'll be run. */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
      
      //正在运行的任务;包括已经取消但是还没结束的任务;
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    

    ExecutorService 线程池

        public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    

    当执行任务,将任务加入任务队列

     OkHttpClient client = new OkHttpClient.Builder().build();
     Request request = new Request.Builder()
        .url("http://qq.com").get().build();
      client.newCall(request).enqueue(new Callback() {
      @Override public void onFailure(Call call, IOException e) {
    
      }
      @Override public void onResponse(Call call, Response response)  throws IOException {
    
      }
    });
    

    enqueue: 添加任务实际上是入队

    synchronized void enqueue(AsyncCall call) {
      if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
            //如果满足最大并发请求数为64, 每个主机最大请求数为5;
          //添加正在运行的请求
        runningAsyncCalls.add(call);
           //线程池执行请求
        executorService().execute(call);
      } else {
          //添加到缓存队列
        readyAsyncCalls.add(call);
      }
    

    相关文章

      网友评论

          本文标题:OkHttp使用及源码学习

          本文链接:https://www.haomeiwen.com/subject/tvenrttx.html