OkHttp

作者: ewinddc | 来源:发表于2018-12-14 17:15 被阅读0次

    介绍

    一个现代的Http请求客户端,可以在java或者android使用,有以下特点

    • 支持HTTP2
    • 连接池,实现Http1.1长连接和http2.0多路复用
    • 拦截器,内部预置拦截器和自定义拦截器支持,可以往HTTP请求时插入逻辑和职责

    收获

    • 拦截器的设计很精妙,责任链模式,单一职责思想,链式调用。可降低代码的工程复杂度,易扩展,易维护
    • 分层和模块化是分解项目的重要手段,复杂庞大的功能可以通过分层和模块化一步步拆解,工程上更容易实现和稳定
    • 各个层次拦截器的阅读,可以了解okhttp是如何一步步实现http协议,最底层的CallServerInterceptor是最终的HTTP包的构建,解析,读取,写入。

    sample

    OkHttpClient client = new OkHttpClient();
    
      Request request = new Request.Builder()
          .url(url)
          .build();
    
      Response response = client.newCall(request).execute();
      return response.body().string();
    
    

    调用流程

    • 构建OkHttpClient
    • 构建Request
    • OkHttpClient#newCall(Request)
    • call#execute或者call#enqueue(callback)
    • 解析Response

    接口分析

    构建OKHttpClient

    一如既往,提供一个外观类OKHttpClient封装了所有的配置,这个类毫无意外也是通过Builder构建。
    Builder

    • timeout参数配置(call,connect,read,write)
    • proxy配置http代理
    • cookieJar
    • cache
    • dns
    • socketFactory
    • sslSocketFactory https相关,配置CA
    • hostnameVerifier
    • connectionPool
    • dispatcher
    • addInterceptor
    • addNetworkInterceptor
    • eventListener 用于监听网络请求的时间
    • build

    构建Request

    也提供Builder

    • url
    • header(String name,String value)
    • cacheControl
    • get,post,delete,put,patch
    • method(String method,RequestBody)设定http请求方法
    • tag
      对比Retrofit就发现接口比较原始,基本上更接近Http协议
    • Url
    • http method
    • header
    • body

    Call

    public interface Call extends Cloneable {
     
      Response execute() throws IOException;
      void enqueue(Callback responseCallback);
      void cancel();
     
      Request request();
    
      interface Factory {
        Call newCall(Request request);
      }
    }
    

    提供同步和异步方法,注意OKHttp enqueue后的callback返回并不是UI线程,Retrofit帮我们转接了。

    框架设计

    okhttp架构.png
    这是原文,这个图大致按照调用栈大致描绘了层次关系。
    • 接口层
    • 协议层
    • 连接层 连接池,支持长连接和多路复用
    • cache层
    • I/O层 高效的IO操作,依赖okio
    • 拦截器 贯穿上下,非常重要

    拦截器

    拦截器是OKHttp的一大特性,它是典型的责任链模式,链式递归调用

    public interface Interceptor {
      Response intercept(Chain chain) throws IOException;
    
      interface Chain {
        Request request();
        Response proceed(Request request) throws IOException;
        Call call();
    }
    

    分为application和network拦截器,主要处理request和response
    一个interceptor通常步骤

    1. 处理Request
    2. 调用Chain#proceed
    3. 处理Response并返回

    我们知道OkHttp通过newCall,返回的其实是RealCall,然后我们看RealCall#execute方法

    public Response execute() throws IOException {
     
        try {
          client.dispatcher().executed(this);
          Response result = getResponseWithInterceptorChain();
          return result;
        }finally {
          client.dispatcher().finished(this);
        }
      }
       Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    
        return chain.proceed(originalRequest);
      }
    

    getResponseWithInterceptorChain干了4件事

    • 添加用户自定义的application interceptor
    • 添加内置拦截器
    • 添加用户自定义的network interceptor
    • 通过RealInterceptorChain开始链式递归调用
    public final class RealInterceptorChain implements Interceptor.Chain {
     public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
     
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
        return response;
      }
    }
    

    这个RealInterceptorChain#proceed里又构建了RealInterceptorChain,调用了拦截器链表的下一个,每个拦截器的intercept方法需要调用的chain都是这个RealInterceptorChain,只不过新的实例,新的参数。Interceptor负责调chain#proceed触发下一个拦截器


    拦截器.png

    内置拦截器

    • retryAndFollowUpInterceptor 超时重试和重定向
    • BridgeInterceptor 一些header字段,content-length,content-encode做透明gzip,cookie,keep-alive等
    • CacheInterceptor
    • ConnectInterceptor
    • CallServerInterceptor 真正的网络请求

    自定义拦截器

    • application 不考虑重传和重定向,不考虑cache,永远调用一次
    • network 在connect和callserver之间,命中cache会被短路

    总结拦截器我们发现,整个流程一层层往下贯穿,再一层层往上,跟网络协议栈的思路是一样的。这里其实也可以用装饰者来实现

    interface ITask {
        Response call(Requst requst);
      }
      
      class TaskImpl implements ITask{
        private ITask nextTask;
        public TaskImpl(ITask task){
          nextTask = task;
        }
    
        public Response call(Requst requst) {
        // 在此处可以处理request
          if(nextTask != null){
            response = nextTask.call(requst);
          }
        // 在此处可以处理response
          return response;
        }
      }
    
    class main(){
        ITask a = new TaskImpl();
        ITask b = new TaskImpl(a);
        ITask c = new TaskImpl(b);
        c.call(request);
      }
    

    任务调度

    通常我们会调用call#enqueu(callback)异步方法等待结果返回,OKHttp内部维护线程池用来执行请求,具体实现类是Dispatcher

    public final class Dispatcher {
    private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
     /** Executes calls. Created lazily. */
      private @Nullable ExecutorService executorService;
    
      /** Ready async calls in the order they'll be run. */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    
    void enqueue(AsyncCall call) {
        synchronized (this) {
          readyAsyncCalls.add(call);
        }
        promoteAndExecute();
      }
    
      private boolean promoteAndExecute() {
        assert (!Thread.holdsLock(this));
    
        List<AsyncCall> executableCalls = new ArrayList<>();
        boolean isRunning;
        synchronized (this) {
          for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
    
            if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
            if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
    
            i.remove();
            executableCalls.add(asyncCall);
            runningAsyncCalls.add(asyncCall);
          }
          isRunning = runningCallsCount() > 0;
        }
    
        for (int i = 0, size = executableCalls.size(); i < size; i++) {
          AsyncCall asyncCall = executableCalls.get(i);
          asyncCall.executeOn(executorService());
        }
    
        return isRunning;
      }
    }
    

    内部维护了3个任务队列来存储请求,一个线程池来执行任务
    enqueue

    • 先把任务插入readyQueue
    • 遍历readyQueue,判断是否超过总体最大值和单host最大值
    • 遍历所有可运行的请求,调用AsyncCall#executeOn(executorService)
    • 这个AsyncCall最终也是调用的getResponseWithInterceptorChain触发拦截器,获取结果,然后直接在子线程回调结果

    缓存

    CacheInterceptor来拦截缓存,使用DiskLruCache来实现缓存,CacheStrategy做缓存策略

    public final class CacheInterceptor implements Interceptor {
      public Response intercept(Chain chain) throws IOException {
         Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
    
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
    
        if (cache != null) {
          cache.trackResponse(strategy);
        }
    
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
    
         // If we don't need the network, we're done.
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
      }
    }
    
    • 通过request获取cache结果
    • 通过CacheStrategy判断是否需要请求网络,不需要直接短路返回,不继续往下走拦截器
    • 继续chain.proceed,请求网络,获取response
    • 更新缓存
    • 返回response

    CacheStrategy

    public final class CacheStrategy {
    /** The request to send on the network, or null if this call doesn't use the network. */
      public final @Nullable Request networkRequest;
      /** The cached response to return or validate; or null if this call doesn't use a cache. */
      public final @Nullable Response cacheResponse;
    
      public static class Factory {
        final long nowMillis;
        final Request request;
        final Response cacheResponse;
        private Date servedDate;
        private Date lastModified;
         private Date expires;
        private String etag;
      }
    
      public CacheStrategy get() {
          CacheStrategy candidate = getCandidate();
          if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
            // We're forbidden from using the network and the cache is insufficient.
            return new CacheStrategy(null, null);
          }
          return candidate;
        }
    
        private CacheStrategy getCandidate() {
         CacheControl requestCaching = request.cacheControl();
          if (requestCaching.noCache() || hasConditions(request)) {
            return new CacheStrategy(request, null);
          }
    
      if (etag != null) {
            conditionName = "If-None-Match";
            conditionValue = etag;
          } else if (lastModified != null) {
            conditionName = "If-Modified-Since";
            conditionValue = lastModifiedString;
          } else if (servedDate != null) {
            conditionName = "If-Modified-Since";
            conditionValue = servedDateString;
          } else {
            return new CacheStrategy(request, null); // No condition! Make a regular request.
          }
          。。。
        }
    }
    
    • 通过Factory构建CacheStrategy
    • 两个公有final变量,networkRequest标识是否需要请求网络,CacheResponse组装了缓存的结果
    • 工厂循环遍历cache response的header,主要是缓存刷新的两组字段,expires和last-modifiled,etag
    • CacheStrategy基本根据HTTP的cache协议

    连接池

    性能提升的关键,为了实现http1.1的长连接和http2的多路复用

    • 长连接,一个请求结束后,不会立即关闭TCP socket,而是等待下一个请求,直到超时。规避TCP的拥塞控制的慢启动,可以显著提升响应速度
    • 多路复用,二进制帧,header压缩。一个tcp socket支持多个http请求并行,大大增加并行效率

    地址

    • url
    • Address 包含域名,port,https setting,protocol
    • Route 包含ip,proxy。同一个Address可能有多个Route,因为DNS返回多个ip

    流程

    public interface Connection {
      Route route();
      //TCP连接
      Socket socket();
      //TLS
      Handshake handshake();
      //Http协议
      Protocol protocol();
    }
    
    • 通过url创建Address
    • 从ConnectionPool获取Connection
    • 如果没获取到,则向DNS查询IP,得到Route
    • 如果是新的route,发起tcp连接或者tls握手,获得Connection
    • 通过Connection发起请求,流转到network拦截器

    ConnectInterceptor通过StreamAllocation#newStream获得Connection

    CallServerInterceptor

    真正的http请求和解析都在这个拦截器里面,依赖okio这个库。

    • exchange 管理类
    • ExchangeCode,接口类,定义打包request,解析response的行为
    /** Encodes HTTP requests and decodes HTTP responses.  */
    interface ExchangeCodec {
    
      /** Returns an output stream where the request body can be streamed.  */
      fun createRequestBody(request: Request, contentLength: Long): Sink
    
      /** This should update the HTTP engine's sentRequestMillis field.  */
      fun writeRequestHeaders(request: Request)
    
      /** Flush the request to the underlying socket and signal no more bytes will be transmitted.  */
      fun finishRequest()
    
     fun readResponseHeaders(expectContinue: Boolean): Response.Builder?
    
      fun openResponseBodySource(response: Response): Source
    }
    

    okhttp迁移很多文件为Kotlin,我们至少要大致能看懂Kotlin代码

    • Http1ExchangeCodec HTTP/1协议的实现类
    • Http2ExchangeCodec HTTP/2协议的实现类。二进制Header和Body。多路复用。
    public final class Http1ExchangeCodec implements ExchangeCodec {
     /** The client that configures this stream. May be null for HTTPS proxy tunnels. */
      private final OkHttpClient client;
    
      /** The connection that carries this stream. */
      private final RealConnection realConnection;
      //socket对应的输入流
      private final BufferedSource source;
      //socket对应的输出流
      private final BufferedSink sink;
    
     /** HTTP协议标准,写入request到流,requestline和header */
      public void writeRequest(Headers headers, String requestLine) throws IOException {
        if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
        sink.writeUtf8(requestLine).writeUtf8("\r\n");
        for (int i = 0, size = headers.size(); i < size; i++) {
          sink.writeUtf8(headers.name(i))
              .writeUtf8(": ")
              .writeUtf8(headers.value(i))
              .writeUtf8("\r\n");
        }
        sink.writeUtf8("\r\n");
        state = STATE_OPEN_REQUEST_BODY;
      }
    
    @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
           StatusLine statusLine = StatusLine.parse(readHeaderLine());
    
          Response.Builder responseBuilder = new Response.Builder()
              .protocol(statusLine.protocol)
              .code(statusLine.code)
              .message(statusLine.message)
              .headers(readHeaders());
    
          return responseBuilder;
     
      }
    
    /** 按照http标准读取header,一行一行的读 */
      private Headers readHeaders() throws IOException {
        Headers.Builder headers = new Headers.Builder();
        // parse the result headers until the first blank line
        for (String line; (line = readHeaderLine()).length() != 0; ) {
          addHeaderLenient(headers, line);
        }
        return headers.build();
      }
    
     @Override public Source openResponseBodySource(Response response) {
        if (!HttpHeaders.hasBody(response)) {
          return newFixedLengthSource(0);
        }
    
      //分段读取
        if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
          return newChunkedSource(response.request().url());
        }
    
      // size已知
        long contentLength = HttpHeaders.contentLength(response);
        if (contentLength != -1) {
          return newFixedLengthSource(contentLength);
        }
    
        return newUnknownLengthSource();
      }
    }
    

    [如何调试](https://blog.csdn.net/alvinhuai/article/details/81288270,用Android Studio跑OkHttp的sampleClient模块,加一些配置,可在本机直接跑,也可以用AS的Debugger断点调试。像Retrofit这种纯java的库,都可以在本机调试,效率高。
    )

    reference

    相关文章

      网友评论

          本文标题:OkHttp

          本文链接:https://www.haomeiwen.com/subject/flvvhqtx.html