美文网首页
深入浅出Okhttp

深入浅出Okhttp

作者: jimjayce | 来源:发表于2018-08-23 16:44 被阅读0次

    在Android世界中前前后后出现过很多网络框架,比如:HttpUrlConnectionHttpClientVolleyAsyncHttpClientokhttpRetrofit等。其中目前比较流行的当属okhttpRetrofit莫属了,其中Retrofit是基于okhttp的基础上进行的进一步的封装得到的,所以对于知其然还要知其所以然的monkey来说了解okhttp的源码是很必要的,所以下面请跟我一起来看看okhttp的源码吧。

    本篇主要三个部分,分别是:OKHTTP流程分析、连接池实现、Dispatcher详解。

    一、OKHTTP流程分析

    想要分析源码我们需要一个切入口进行跟进,最好的入口不外乎日常使用的流程。我们对okhttp的一般使用方法:

    //同步
        OkHttpClient client = new OkHttpClient();
    
        Request request = new Request.Builder()
                .url(url)
                .build();
        Response response = client.newCall(request).execute();
        if (response.isSuccessful()) {
            //...
        } else {
            //...
        }
    
    //异步
        OkHttpClient client = new OkHttpClient();
    
        Request request = new Request.Builder()
                .url(url)
                .build();
    
        client.newCall(request).enqueue(new Callback() {
            @Override public void onFailure(Call call, IOException e) {
               //...
            }
    
            @Override public void onResponse(Call call, Response response) throws IOException {
                //...
            }
        });
    

    由上可知,先初始化一个OkHttpClient,然后调用其newCall()函数返回一个call类。看一下其实现:

      @Override 
      public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
      }
    

    直接返回了RealCallnewRealCall(),其中RealCallcall接口的实现类。

    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.eventListener = client.eventListenerFactory().create(call);
        return call;
      }
    

    通过newCall()最终返回了一个RealCall的实例。之后同步调用RealCallexecute(),异步调用enqueue,我们先来看同步:

      @Override 
      public Response execute() throws IOException {
        try {
          Response result = getResponseWithInterceptorChain();//执行request请求
          return result;
        } catch (IOException e) {
          //...
        } finally {
          client.dispatcher().finished(this);//切换下一request执行
        }
      }
    

    简化了以下只剩下最简单的代码,可以看到直接调用了getResponseWithInterceptorChain()

    再看异步:

    @Override 
      public void enqueue(Callback responseCallback) {
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    
      synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);//放到正在执行队列
          executorService().execute(call);//执行
        } else {
          readyAsyncCalls.add(call);//放到等待队列
        }
      }
    
    

    可以看到生成了一个AsyncCall并在executorService().execute(call);进行了执行,看下AsyncCall

      final class AsyncCall extends NamedRunnable {
    
        AsyncCall(Callback responseCallback) { }
    
        @Override 
        protected void execute() {
          try {
            Response response = getResponseWithInterceptorChain();
            //...
          } catch (IOException e) {
            //...
          } finally {
            client.dispatcher().finished(this);
          }
        }
      }
    
    

    和同步的execute非常相似,都最终调用了getResponseWithInterceptorChain(),其实同步和异步的区别就是一个直接执行了,一个使用了线程池,具体实现值得学习一下,感兴趣可以看下源码,不在赘述。

    下面就主要看getResponseWithInterceptorChain()的实现了:

      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    
        return chain.proceed(originalRequest);
      }
    

    其中就做了两件事:1)创建了一些interceptors;2)新建了RealInterceptorChain并调用了它的proceed的方法。我们直接看该proceed做了什么

    @Override 
      public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    
      public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout);
        
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
        
        return response;
      }
    

    可知,其中又新建了只有index + 1不同的RealInterceptorChain并执行了上面的interceptors链表中某一个interceptorintercept(next),我们看下intercept(next)在干嘛(以retryAndFollowUpInterceptor为例)

      @Override 
      public Response intercept(Chain chain) throws IOException {
    
        StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(request.url()), call, eventListener, callStackTrace);
    
        while (true) {//如果结果正常则返回,否则使用更新的request进行第二次处理。
          Response response = realChain.proceed(request, streamAllocation, null, null);//直接传递给下一个
          -----------------------------------------------------------------------------------------
          Request followUp = followUpRequest(response, streamAllocation.route());//处理重定向等逻辑.
    
          if (followUp == null) {
            return response;
          }
          //建立新的StreamAllocation,以供followUp 使用。
          streamAllocation = new StreamAllocation(client.connectionPool(),
                createAddress(followUp.url()), call, eventListener, callStackTrace);
    
          request = followUp;
        }
      }
    

    可见,主要做了三件事:
    1)新建StreamAllocation ;用于建立网络连接
    2)执行上面传递过来的next.proceed()执行下一个interceptor的intercept(),直到interceptors全部被执行完。
    3)调用followUpRequest,进行检查response中是否含有重定向,没有返回null,有返回新的request。
    其中while死循环的目的就是持续检查返回结果中是否有重定向,直到没有在跳出。
    注意:其中的分割线,分割线以上都是用来处理request逻辑的,分割线以下都是用来处理response逻辑的,因为realChain.proceed会持续循环调用,直到返回结果,调用链如下图。

    image.png

    总体流程图:


    image.png

    其中,总体的调用流程就是上面的部分,下面介绍下几个拦截器的作用,分析方法和上面retryAndFollowUpInterceptor一样,只要看intercept方法就可以了。

    retryAndFollowUpInterceptor();//主要负责重定向拦截相关
    BridgeInterceptor(client.cookieJar());//主要负责请求相应头添加去除等逻辑
    CacheInterceptor(client.internalCache());//主要负责缓存相关,使用了diskLruCache()
    ConnectInterceptor(client);//主要负责网络连接相关
    CallServerInterceptor(forWebSocket);//最后一个拦截器,负责与服务器建立 Socket 连接
    client.interceptors();//用户自定
    client.networkInterceptors();//用户自定义,用户可以在拦截器的一头一尾进行自定义功能的数据处理,
    //并在client初始化时传入进去即可。
    

    如果只是看流程的到这里就可以结束了。

    二、连接池实现

    下面我们看一下连接池ConnectionPool实现,我们都直到网络连接是基于TCP/IP基础的,所以必须要经历三次握手与四次挥手,频繁的建立连接与断开连接是很耗时的,所以就建立连接池来实现对连接的最大复用。

    ConnectionPool内部维持了一个ArrayDeque来保存连接。

    private final Deque<RealConnection> connections = new ArrayDeque<>();
    
      public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
      }
    

    可维持的最大数量默认5个,维持时间5分钟,会创建一个线程定时进行查询处理超时的链接。虽然ConnectionPool在client初始化时就传入了进来,但是直到ConnectInterceptor时才会调用进行查找,最终会调用其get方法:

      @Nullable 
      RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection, true);
            return connection;
          }
        }
        return null;
      }
    

    其中就是遍历所有链接connections对它进行检查是否可用,如果符合就返回,Connection代表真实的socket物理连接,如下图


    image.png

    下面看下isEligible做了什么:

      /**
       * Returns true if this connection can carry a stream allocation to {@code address}. If non-null
       * {@code route} is the resolved route for a connection.
       */
      public boolean isEligible(Address address, @Nullable Route route) {
        // If this connection is not accepting new streams, we're done.
        if (allocations.size() >= allocationLimit || noNewStreams) return false;
    
        // If the non-host fields of the address don't overlap, we're done.
        if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
    
        // If the host exactly matches, we're done: this connection can carry the address.
        if (address.url().host().equals(this.route().address().url().host())) {
          return true; // This connection is a perfect match.
        }
        //...
        return true; // The caller's address can be carried by this connection.
      }
    

    可见,先是检查是否超过每个连接所能链接的数量,默认是:1,然后检查代理、DNS、目标主机等是否相同,如果都相同就说明该链接可用,否则不可用。

    接着看一下streamAllocation.acquire(connection, true);

      public void acquire(RealConnection connection, boolean reportedAcquired) {
        assert (Thread.holdsLock(connectionPool));
        if (this.connection != null) throw new IllegalStateException();
    
        this.connection = connection;
        this.reportedAcquired = reportedAcquired;
        connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
      }
    

    可见,将可用的connection保存到streamAllocation中,并将该streamAllocation添加到connection.allocations链表中

    public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
    

    从中我们就可以答题理解connection与StreamAllocation的关系了:StreamAllocation是一个负责查找可用连接,完成链接回调的类,它是物理连接connection的逻辑代表,直到ConnectInterceptor调用时两者合二为一。

    三、Dispatcher详解

    在第一部分有讲过同步和异步的处理,其实他们都是通过Dispatcher分发实现的。了解一下Dispatcher中几个重要的变量:

      /** 异步等待队列 */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      /** 异步执行队列 */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** 同步执行队列 */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    

    Dispatcher就是对这三个变量的维护来实现对request的管理。

      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
    

    maxRequests 用来限制异步请求runningAsyncCalls 的最大长度,默认为64个;maxRequestsPerHost用来限制runningAsyncCalls 中相同host的最大请求数量,默认为5个;超过以上两个限制的request都将放入readyAsyncCalls 队列中。

    同步请求在

    Response response = client.newCall(request).execute();
    

    后直接调用

    client.dispatcher().executed(this);//入列
    
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
    

    加入了runningSyncCalls队列,等到执行结束后回调finished方法

    client.dispatcher().finished(this);
    
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    

    异步请求在

     client.newCall(request).enqueue(new Callback())
    

    执行结束后回调

    client.dispatcher().finished(this);
    
    void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
    

    是的,它们最后执行的finished是同一个泛型函数,只是最后一个参数不同而已。

      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        synchronized (this) {
        //在runningSyncCalls队列中将该请求移除
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
        }
      }
    
    

    可见,同步移除之后直接退出了,异步执行了promoteCalls:

      private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

    可见,直接取出等待队列中的一个request执行。

    相关文章

      网友评论

          本文标题:深入浅出Okhttp

          本文链接:https://www.haomeiwen.com/subject/jyqzvftx.html