美文网首页工作生活
OKHTTP 源码解析

OKHTTP 源码解析

作者: 大盗海洲 | 来源:发表于2019-07-03 14:08 被阅读0次

参考
彻底理解OkHttp - OkHttp 源码解析及OkHttp的设计思想
Okhttp3源码分析

image.png
  OkHttpClient okHttpClient = new OkHttpClient();
        OkHttpClient.Builder builder = okHttpClient.newBuilder();
        builder.addInterceptor(new HttpLoggingInterceptor());

        Request request = new Request.Builder()
                .url(url)
                .get()
                .build()
                ;

        okHttpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                System.out.println(e.toString());
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                    System.out.println(response.body().toString());
            }
        });
final class RealCall implements Call {
 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
}

public final class Dispatcher {
//TODO 同时能进行的最大请求数
  private int maxRequests = 64;
 //TODO 同时请求的相同HOST的最大个数 SCHEME :// HOST [ ":" PORT ] [ PATH [ "?" QUERY ]]
    //TODO 如 https://restapi.amap.com  restapi.amap.com - host
  private int maxRequestsPerHost = 5;

  /** Ready async calls in the order they'll be run. */
  /**
     * Ready async calls in the order they'll be run.
     * TODO 双端队列,支持首尾两端 双向开口可进可出,方便移除
     * 异步等待队列
     *
     */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  //正在进行的异步队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //正在进行的同步队列
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

 synchronized void enqueue(AsyncCall call) {
    // //TODO 同时请求不能超过并发数(64,可配置调度器调整)
        //TODO okhttp会使用共享主机即 地址相同的会共享socket
        //TODO 同一个host最多允许5条线程通知执行请求
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      //  //TODO 加入运行队列 并交给线程池执行
      runningAsyncCalls.add(call);
 //TODO AsyncCall 是一个runnable,放到线程池中去执行,查看其execute实现
      executorService().execute(call);
    } else {
//TODO 加入等候队列
      readyAsyncCalls.add(call);
    }
  }
}
   //TODO 没有核心线程 ,非核心线程数量没有限制,闲置60秒回收 任务队列,
//这个线程池跟Android中的CachedThreadPool非常类似,这种类型的线程池,
//适用于大量的耗时较短的异步任务
 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

call放到了线程池中那么它是如何执行的呢?注意这里的call是AsyncCall。

我们看一下AsyncCall的实现:

final class RealCall implements Call {
 //public abstract class NamedRunnable implements Runnable 
 final class AsyncCall extends NamedRunnable {
   private final Callback responseCallback;

   AsyncCall(Callback responseCallback) {
     super("OkHttp %s", redactedUrl());
     this.responseCallback = responseCallback;
   }

   //线程池实际上就是执行了execute()
   @Override protected void execute() {
     boolean signalledCallback = false;
     try {
       //
       Response response = getResponseWithInterceptorChain();
       if (retryAndFollowUpInterceptor.isCanceled()) {
         signalledCallback = true;
         responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
       } else {
         signalledCallback = true;
         responseCallback.onResponse(RealCall.this, response);
       }
     } catch (IOException e) {
       if (signalledCallback) {
         // Do not signal the callback twice!
         Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
       } else {
         eventListener.callFailed(RealCall.this, e);
         responseCallback.onFailure(RealCall.this, e);
       }
     } finally {
      //TODO 移除队列
       client.dispatcher().finished(this);
     }
   }
 }
}

值得注意的finally 执行了client.dispatcher().finished(this); 通过调度器移除队列,并且判断是否存在等待队列,如果存在,检查执行队列是否达到最大值,如果没有将等待队列变为执行队列。这样也就确保了等待队列被执行。

public final class Dispatcher {
  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //promoteCalls=true,即异步调用
      if (promoteCalls) promoteCalls();
     //TODO 运行队列的数量
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
      //闲置调用
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }


  private void promoteCalls() {
    //正在执行的异步队列大于最大请求队列 ,不执行下面代码
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    //等待队列为空,不执行下面代码
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      //  //TODO  相同host的请求没有达到最大,加入运行队列
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        //将等待队列加入到运行队列中
        i.remove();
        runningAsyncCalls.add(call);
        //交给线程池来执行
        executorService().execute(call);
      }
    //当runningAsyncCalls满了,直接退出迭代
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }


}

真正的执行网络请求和返回响应结果:getResponseWithInterceptorChain(),下面我们着重分析一下这个方法:

final class RealCall implements Call {
//TODO 核心代码 开始真正的执行网络请求
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    //TODO 责任链
    List<Interceptor> interceptors = new ArrayList<>();
    //TODO 在配置okhttpClient 时设置的intercept 由用户自己设置
     ////首先添加的是用户添加的全局拦截器
    interceptors.addAll(client.interceptors());
    //TODO 负责处理失败后的重试与重定向
    interceptors.add(retryAndFollowUpInterceptor);
    //TODO 负责把用户构造的请求转换为发送到服务器的请求 、把服务器返回的响应转换为用户友好的响应 处理 配置请求头等信息
    //TODO 从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应。
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //TODO 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
    //TODO 设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
    //TODO 可配置用户自己设置的缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //TODO 连接服务器 负责和服务器建立连接 这里才是真正的请求网络
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //TODO 配置okhttpClient 时设置的networkInterceptors
      //TODO 返回观察单个网络请求和响应的不可变拦截器列表。
      interceptors.addAll(client.networkInterceptors());
    }
    //TODO 执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
    //TODO 进行http请求报文的封装与请求报文的解析
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //TODO 创建责任链
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    //TODO 执行责任链
    return chain.proceed(originalRequest);
  }

}

从上述代码中,可以看出都实现了Interceptor接口,这是okhttp最核心的部分,采用责任链的模式来使每个功能分开,每个Interceptor自行完成自己的任务,并且将不属于自己的任务交给下一个,简化了各自的责任和逻辑。

责任链模式是设计模式中的一种也相当简单参考链接,这里不在复述。

我们着重分析一下,okhttp的设计实现,如何通过责任链来进行传递返回数据的。

上述代码中可以看出interceptors,是传递到了RealInterceptorChain该类实现了Interceptor.Chain,并且执行了chain.proceed(originalRequest)。

其实核心代码就是chain.proceed() 通过该方法进行责任链的执行


public final class RealInterceptorChain implements Interceptor.Chain {
 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }


 public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    return response;
  }
}

从上述代码,我们可以知道,新建了一个RealInterceptorChain 责任链 并且 index+1,然后 执行interceptors.get(index); 返回Response。

其实就是按顺序执行了拦截器,这里我画了一个简图:

image.png

拦截器的执行顺序便是如上图这样执行的。
这样设计的一个好处就是,责任链中每个拦截器都会执行chain.proceed()方法之前的代码,等责任链最后一个拦截器执行完毕后会返回最终的响应数据,而chain.proceed() 方法会得到最终的响应数据,这时就会执行每个拦截器的chain.proceed()方法之后的代码,其实就是对响应数据的一些操作。

CacheInterceptor 缓存拦截器就是很好的证明,我们来通过CacheInterceptor 缓存拦截器来进行分析,大家就会明白了。

/** Serves requests from the cache and writes responses to the cache. */
public final class CacheInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//TODO 获取request对应缓存的Response 如果用户没有配置缓存拦截器 cacheCandidate == null
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
 //TODO 获取缓存中(CacheStrategy)的Response
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }
   //TODO 缓存无效 关闭资源
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }
// If we're forbidden from using the network and the cache is insufficient, fail.
 //TODO networkRequest == null 不实用网路请求 且没有缓存 cacheResponse == null  返回失败
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }
  //TODO 不使用网络请求 且存在缓存 直接返回响应
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
}

上述的代码,主要做了几件事:

如果用户自己配置了缓存拦截器,cacheCandidate = cache.Response 获取用户自己存储的Response,否则 cacheCandidate = null;同时从CacheStrategy 获取cacheResponse 和 networkRequest
如果cacheCandidate != null 而 cacheResponse == null 说明缓存无效清楚cacheCandidate缓存。
如果networkRequest == null 说明没有网络,cacheResponse == null 没有缓存,返回失败的信息,责任链此时也就终止,不会在往下继续执行。
如果networkRequest == null 说明没有网络,cacheResponse != null 有缓存,返回缓存的信息,责任链此时也就终止,不会在往下继续执行。

上部分代码,其实就是没有网络的时候的处理。

/** Serves requests from the cache and writes responses to the cache. */
public final class CacheInterceptor implements Interceptor {
    //TODO 执行下一个拦截器
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

 //TODO 网络请求 回来 更新缓存
    // If we have a cache response too, then we're doing a conditional get.
    //TODO 如果存在缓存 更新
    if (cacheResponse != null) {
      //TODO 304响应码 自从上次请求后,请求需要响应的内容未发生改变
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    //TODO 缓存Response
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

   if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;

}

下部分代码主要做了这几件事:

执行下一个拦截器,也就是请求网络
责任链执行完毕后,会返回最终响应数据,如果缓存存在更新缓存,如果缓存不存在加入到缓存中去。

这样就体现出了,责任链这样实现的好处了,当责任链执行完毕,如果拦截器想要拿到最终的数据做其他的逻辑处理等,这样就不用在做其他的调用方法逻辑了,直接在当前的拦截器就可以拿到最终的数据。
这也是okhttp设计的最优雅最核心的功能。

image.png

相关文章

网友评论

    本文标题:OKHTTP 源码解析

    本文链接:https://www.haomeiwen.com/subject/ubithctx.html