美文网首页
OkHttp源码分析

OkHttp源码分析

作者: echoSuny | 来源:发表于2020-04-04 15:52 被阅读0次

OkHttp介绍:
由Square公司贡献的一个处理网络请求的开源项目,是目前Android使用最广泛的网络框架。从Android4.4开始HttpUrlConnection的底层也换成了OkHttp。
特点:
支持http/2并允许对同一主机的所有请求共享一个套接字
通过连接池减少请求延迟
默认通过GZip压缩数据
响应缓存,避免重复的网络请求
请求失败自动重试主机的其他ip,自动重定向

简单使用

        // 创建OkHttpClient
        OkHttpClient client = new OkHttpClient.Builder().build();
       //也可以直接new出来 OkHttpClient okHttpClient = new OkHttpClient();
        
        // get请求
        Request getRequest = new Request.Builder()
                .url("")
                .get()
                .build();

     //post请求
        RequestBody body = new FormBody.Builder()
                                       .add("","")
                                       .add("","")
                                       . build();

        Request postRequest = new Request.Builder().post(body)
                .url("")
                .build();

        // 传入getRequest 或者 postRequest 构建call
        Call postCall = okHttpClient.newCall(postRequest);
        // 发起请求
        call.enqueue(new okhttp3.Callback() {
            @Override
            public void onFailure(okhttp3.Call call, IOException e) {

            }

            @Override
            public void onResponse(okhttp3.Call call, okhttp3.Response response) throws IOException {

            }
        }); 
调用流程

根据流程图可以看到前面几步都是在自己写的代码当中,直到调用execute或者enqueue时才进入了Okhttp内部去执行请求。

1 Dispatcher
首先需要知道的是Dispatcher内部维护了三个队列和一个线程池

  // 线程池
  private @Nullable ExecutorService executorService;
  // ready状态的异步任务队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // running状态的异步任务队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  // 同步任务队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

先看一张分发器异步请求流程图:


分发器异步工作流程

通过上面的图我们可以发现dispatcher可能向readyAsyncCalls放入请求,也可以向runningAysncCalls中存放那么就存在了问题:
(1). 如何决定放入readyAsyncCalls还是runningAysncCalls?
(2). 满足什么条件才会从把readyAsyncCalls请求移动到runningAysncCalls当中?
(3). 线程池如何创建,如何工作?

问题一:如何决定放入readyAsyncCalls还是runningAysncCalls

// 调用newCall实际是调用了返回了一个RealCall对象
@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
// 执行enqueue()方法实际是调用了RealCall的enqueue()方法
@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    // 最终调用了分发器的enqueue()
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

synchronized void enqueue(AsyncCall call) {
     // 第一个问题的答案就在于此
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 放入running队列
      runningAsyncCalls.add(call);
      // 线程池执行任务
      executorService().execute(call);
    } else {
     // 反之存入ready队列
      readyAsyncCalls.add(call);
    }
  }

到了这里,第一个问题就可以解答了。首先判读运行当中的异步任务队列中的任务个数是不是小于maxRequests(默认是64,可自己配置),其次需要满足对同一个主机的正在请求的个数数需要小于maxRequestsPerHost(默认是5,可自己配置)。也就是说超过了最大并发请求数超过了64或者对同一个服务器访问的任务数超过了5,那么就存放到ready队列当中。
问题二:满足什么条件才会从把readyAsyncCalls请求移动到runningAysncCalls当中

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
     ......// 省略部分代码

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          // 请求失败
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          // 请求成功
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          // 请求失败
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        // 无论成功失败 都会执行
        client.dispatcher().finished(this);
      }
    }
  }

void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      // 把执行完了的任务从队列中移除
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // promoteCalls为true
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

private void promoteCalls() {
     // running状态的任务个数大于最大请求数则忽略
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    // ready状态的队列为空时忽略。也就是没有ready状态的任务,自然不用移动到running当中
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
       // 对同一个服务器的并发访问数要小于5
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        // 从ready中移除
        i.remove();
        // 添加到running队列
        runningAsyncCalls.add(call);
       // 交给线程池执行 
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

至此第二个问题的答案也找到了。

问题三:线程池
提到线程池就必须先说一下线程池的好处,可以概括为三点:
1.重用线程池中的线程,避免线程的重复创建和销毁所带来的性能开销
2.能有效控制线程池的最大并发数,避免大量线程因相互抢占系统资源导致阻塞
3.能够对线程进行管理,并提供定时以及指定间隔循环执行等功能
ThreadPoolExecutor是线程池的真正实现,构造方法提供了一系列参数来配置线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maxinumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory
                          RejectedExecutionHandler handler)

corePoolSize: 线程池的核心线程数。默认情况下,核心线程会在线程池中一直存活,即使处于闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么闲置的核心线程在等待新任务到来的时会有超时策略,这个时间由keepAliveTime指定,当等待时间超过了keepAliveTime所指定的时长之后,核心线程就会被终止。

maxinumPoolSize:线程池所能容纳的最大线程数(包括核心线程数),当活动线程数达到这个值后,后续的新任务会被阻塞。

keepAliveTime:非核心线程闲置的超时时长。当非核心线程闲置时间超过这个数值后,就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,核心线程同样会被回收。

unit:指定keepAliveTime的时间单位。常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒),TimeUnit.MINUTES(分钟)

workQueue:线程池中的任务队列。通过线程池的execute方法提交的任务会存在这个参数中。

threadFactory:线程工厂,为线程池创建新的线程。这是一个接口。

handler:拒绝策略。这个参数不常用。当线程池无法执行新任务时,ThreadPoolExecutor会调用handler的rejectedExecution()方法来通知调用者,默认情况下rejectedExecution()方法会抛出一个RejectedExecutionException。RejectedExecutionHandler是一个接口,有四个默认实现类,分别是AbortPolicy(丢弃任务并抛出RejectedExecutionException异常),CallerRunsPolicy(由提交任务的线程处理该任务),DiscardOldestPolicy(抛弃队列最前面的任务,然后重新提交被拒绝的任务 ),DiscardPolicy(丢弃任务,但不抛出异常)。

线程池执行时的规则:
(1).如果线程池中的线程数量小于核心线程数,会直接启动一个新线程来执行任务
(2).如果线程池中的线程数量大于等于核心线程数,那么任务会被放入队列中去等待执行
(3).在2的基础上如果队列满了或者加入队列时失败,且线程数量小于线程池规定的最大线程数,则会启动一个非核心线程来执行任务。
(4).如果线程池中的数量达到了规定的最大线程数,那么直接拒绝执行此任务,就会调用RejectedExecutionHandler的rejectedExecution()方法,否则就创建一个新的非核心线程执行此任务。

了解了一些线程池的原理之后,现在来分析一下OkHttp当中的线程池具体是如何创建以及为什么这样创建。

 // 可以传入自己的线程池
public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }
  
//使用OkHttp默认的线程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

可以看到OkHttp的线程池是没有核心线程的,假如放入第一个任务A,按照上面线程池的执行规则,A任务会则会被存入队列当中然后等待执行。最大线程数是Integer.MAX_VALUE和keepAliveTime为60,这些没什么好说的。主要看队列SynchronousQueue<Runnable>。SynchronousQueue<Runnable>是无容量队列。也就是说向SynchronousQueue<Runnable>添加任务时一定会失败,则根据线程执行规则3,由于Integer.MAX_VALUE是一个非常大的数值,假如没有闲置的线程,那么线程池就会立即创建一个新的线程来执行任务。
那么OkHttp的线程池是期望获得最大并发量且没有任何等待,但是进程的内存是存在限制的,而每一个线程都需要分配一定的内存,所以线程并不能无限。所以才有了OkHttp最大请求任务执行个数为64的限制。这样即解决了这个问题同时也能获得最大吞吐。

2 Interceptors
OkHttp最核心的也是最重要的就是拦截器功能。默认有5个拦截器。根据责任链设计模式在请求的时候各司其职从上往下调用,在拿到响应之后又会按照从下往上的顺序把Response返回。


拦截器顺序

(1)重试与重定向拦截器 RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor在交出给下一个拦截器之前会判断用户是否取消了请求。在获得了结果之后会根据响应码判断是否需要重定向,如果满足条件,那么就会重启执行所有拦截器
(2)桥接拦截器 BridgeInterceptor
BridgeInterceptor在交出之前负责将HTTP必备的请求头加入其中(如host),并添加一些默认行为(如GZIP)。在获得结果之后调用保存cookie接口并解析GZIP数据
(3)缓存拦截器 CacheInterceptor
CacheInterceptor在交出之前读取并判断是否使用缓存。获取结果之后判断是否缓存
(4)连接拦截器 ConnectInterceptor
在交出之前负责找到或者新建一个连接,并获得对应的socket流。在获得结果之后不再处理。
(5)请求服务拦截器 CallServerInterceptor
CallServerInterceptor真正的与服务器进行通信,向服务器发送数据,解析读取的响应数据

下面来看一下何时使用拦截器处理以及如何实现的责任链:

final class AsyncCall extends NamedRunnable {
     ...... //省略部分代码
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        // 通过拦截器链获取响应
        Response response = getResponseWithInterceptorChain();
        // 重试重定向拦截器的职责之一:判断用户是否取消了请求
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
          ...... //省略部分代码
    }
  }

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    // 用户可以添加自己的拦截器
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor); // 重试重定向
    interceptors.add(new BridgeInterceptor(client.cookieJar())); // 桥接
    interceptors.add(new CacheInterceptor(client.internalCache())); // 缓存
    interceptors.add(new ConnectInterceptor(client)); //连接
    if (!forWebSocket) {
      // 用户可以添加自己的拦截器
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket)); // 请求服务
    
   /**
     *  interceptors 所有的拦截器
     *  index  第五个参数 默认传入0 可以从interceptors根据index的值取出interceptor
     */
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    // 开始处理请求
    return chain.proceed(originalRequest);
  }

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
   ...... // 省略部分代码

   // 构造一个新的chain  index初始传入的是0,也就是说新的chain的index加1之后变成了1
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    // 取出拦截器 由于index 此时为0 取出的也就是retryAndFollowUpInterceptor
    Interceptor interceptor = interceptors.get(index);
    // 执行retryAndFollowUpInterceptor的拦截方法
    Response response = interceptor.intercept(next);
   ...... // 省略部分代码

    return response;
  }
 // 最终会调到retryAndFollowUpInterceptor的拦截方法中的这一行代码
 // realChain就是上面new出来的chain并且index的值为1 那么就会重复上面的proceed()方法,每次index都加1
 //直到最终执行完所有的拦截器 
 // 根据方法参数可以知道retryAndFollowUpInterceptor把request传给了下一个拦截器
response = realChain.proceed(request, streamAllocation, null, null);

RetryAndFollowUpInterceptor 源码分析:

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {  // 取消请求就抛出IO异常
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        // 把request发给下一个拦截器 并获得response
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) { 
       // 路由异常  连接未成功,请求还未发出去
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        // 跳过此次循环 重新进行一遍
        continue;
      } catch (IOException e) { 
       // 请求发出去了 但是和服务器通信失败了
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }
      // 重定向
      Request followUp = followUpRequest(response, streamAllocation.route());
     // 如果为null 也就是不存在需要重定向的Request
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        // 返回结果
        return response;
      }

      closeQuietly(response.body());
      // 重定向次数大于20次 则抛出协议异常
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
      ...... // 省略部分代码
    }
  }

// 返回true代表要进行重试 false 则不进行重试
private boolean recover(IOException e, StreamAllocation streamAllocation,
      boolean requestSendStarted, Request userRequest) {
    streamAllocation.streamFailed(e);

    // OkhttpClient禁止了重试
    if (!client.retryOnConnectionFailure()) return false;

    // requestSendStarted在http2才可能为true http1.1 则为false
    if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;

    // 不是可重试的异常就不重试
    if (!isRecoverable(e, requestSendStarted)) return false;

    //不存在更多的路由时 不进行重试 (一个host可能会被dns解析成多个ip ,如果只有一个就返回false)
    if (!streamAllocation.hasMoreRoutes()) return false;

    // 默认重试
    return true;
  }
// 返回true表示时可重试的异常 反之是不需要或不可重试的异常
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
    // 协议异常不重试
    if (e instanceof ProtocolException) {
      return false;
    }

    if (e instanceof InterruptedIOException) {
     // 如果异常时一个 socket超时异常就重试 例如网络波动导致超时
      return e instanceof SocketTimeoutException && !requestSendStarted;
    }

    if (e instanceof SSLHandshakeException) {
      //  证书异常不重试 
      if (e.getCause() instanceof CertificateException) {
        return false;
      }
    }
    // 证书校验失败不重试
    if (e instanceof SSLPeerUnverifiedException) {
      return false;
    }

    return true;
  }

可以看到关于重试的条件还是很苛刻的,需要经过很多的判断。总结下来就是要先看是否禁止了重试,然后是不是可重试的异常,证书有没有问题,是不是有多个ip。
下面看一下重定向

private Request followUpRequest(Response userResponse, Route route) throws IOException {
    if (userResponse == null) throw new IllegalStateException();
    int responseCode = userResponse.code();
    final String method = userResponse.request().method();
    switch (responseCode) {
      case HTTP_PROXY_AUTH: // 407
        Proxy selectedProxy = route != null
            ? route.proxy()
            : client.proxy();
        // 是否是http代理
        if (selectedProxy.type() != Proxy.Type.HTTP) {
          throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
        }
        // 是的话就要去校验身份 在配置okhttpclient的时候可以配置
       //  例如      OkHttpClient client = new OkHttpClient.Builder()
//                  .proxyAuthenticator(new Authenticator() {
//                    @Nullable
//                    @Override
//                    public Request authenticate(Route route, okhttp3.Response response) throws IOException {
//                        Request request = response.request().newBuilder()
//                                .addHeader("Proxy-Authorization"
//                                        , Credentials.basic("your name","your pwd"))
//                                .build();
//                        return request;
//                    }
//                })
//                .build();
        return client.proxyAuthenticator().authenticate(route, userResponse);

      case HTTP_UNAUTHORIZED: // 401 
        // 出现很少 用法与上面类似 区别是上面是proxyAuthenticator 这个是authenticator
        return client.authenticator().authenticate(route, userResponse);

      case HTTP_PERM_REDIRECT: //308
      case HTTP_TEMP_REDIRECT: //307
        // 请求方法不是GET 并且不是HEAD
       if (!method.equals("GET") && !method.equals("HEAD")) {
          return null;
        }
      case HTTP_MULT_CHOICE:  //300
      case HTTP_MOVED_PERM: //301
      case HTTP_MOVED_TEMP: //302
      case HTTP_SEE_OTHER: //303
        // 客户端是否允许重定向
        if (!client.followRedirects()) return null;
        // 响应头不包含“Location” 不重定向 
        String location = userResponse.header("Location");
        if (location == null) return null;
        // 包含了location则取出来对应的地址
        HttpUrl url = userResponse.request().url().resolve(location);
        // 没有url 返回null 也就是不重定向
        if (url == null) return null;

        boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
        // Scheme不一样并且不允许切换到https 则不重定向
        if (!sameScheme && !client.followSslRedirects()) return null;

        Request.Builder requestBuilder = userResponse.request().newBuilder();
        // method不是get或head
        if (HttpMethod.permitsRequestBody(method)) {
          // maintainBody  = method.equals("PROPFIND")
          final boolean maintainBody = HttpMethod.redirectsWithBody(method);
          // method不是"PROPFIND"
          if (HttpMethod.redirectsToGet(method)) {
          // mothod设置为get body为null
            requestBuilder.method("GET", null);
          } else {
            RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
            requestBuilder.method(method, requestBody);
          }
          if (!maintainBody) {
            requestBuilder.removeHeader("Transfer-Encoding");
            requestBuilder.removeHeader("Content-Length");
            requestBuilder.removeHeader("Content-Type");
          }
        }

        if (!sameConnection(userResponse, url)) {
          requestBuilder.removeHeader("Authorization");
        }
        return requestBuilder.url(url).build();

      case HTTP_CLIENT_TIMEOUT:  //408 
        // 不允许重试 则不重定向
        if (!client.retryOnConnectionFailure()) {
          return null;
        }
        // body是不可重复的body 则不重定向
        if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
          return null;
        }
        // 不是响应408的重试结果则不重定向
        if (userResponse.priorResponse() != null
            && userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) {
          return null;
        }
        // 服务器未响应Retry-After 或者响应了Retry-After:0 则不重定向
        if (retryAfter(userResponse, 0) > 0) {
          return null;
        }

        return userResponse.request();

      case HTTP_UNAVAILABLE: // 503 服务器不可用
        //不是响应503的重试结果则不重定向
        if (userResponse.priorResponse() != null
            && userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
          return null;
        }
        // 服务器明确响应Retry-After:0 则重定向
        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
          return userResponse.request();
        }
        return null;

      default:
        return null;
    }
  }

BridgeInterceptor源码分析

@Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    ...... // 省略部分代码
    // 没有host 添加host 
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
  // 没有Connection添加Connection
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }
    
    //Gzip
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
    // cookie
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
    ...... // 省略部分代码
    return responseBuilder.build();
  }

可以看到桥接拦截器的工作还是很简单的,就是检查请求头,没有的就添加上。简单来说就是补全信息以及cookie和Gzip。
CacheInterceptor源码分析:

@Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    // 根据请求和缓存得到缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    // 网络请求
    Request networkRequest = strategy.networkRequest;
    // 缓存响应
    Response cacheResponse = strategy.cacheResponse;
    ......
    // 既不存在网络请求 也不存在缓存 直接返回504
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 不存在网络请求 存在缓存 则返回缓存
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
    
   // 存在网络请求  则发起请求
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    //如果之前存在缓存
    if (cacheResponse != null) {
      // 最近此次通过网络请求的响应的响应码是304 那么更新缓存
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    ......

    return response;
  }
缓存策略

ConnectInterceptor源码分析:

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // 查找连接 找到了就复用 没有就新建
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

乍一看,就这几行代码,而且也看不出什么东西来。其实不然,这其中包含了一个连接池。连接池的作用与线程池的功能类似,不过是保存的socket连接。连接池是在初始化StreamAllocation的时候传入的,而StreamAllocation则是在RetryAndFollowUpInterceptor中被创建的。

//ConnectPool部分源码
  //和Dispatcher中的线程池差不多 这个线程池的作用是清理连接池中的连接
 private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }
       /**
         * maxIdleConnections 最大连接数 默认是5
         * keepAliveDuration 存活时间 默认5分钟
         */
  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    // 存入队列当中
    connections.add(connection);
  }

@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    // 遍历队列
    for (RealConnection connection : connections) {
      // 检查是不是与连接池中的连接一致 ,一致的话就可以服用
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

public boolean isEligible(Address address, @Nullable Route route) {
    // 连接池中正在使用的不能服用
    if (allocations.size() >= allocationLimit || noNewStreams) return false;
    // 会去判断DNS,端口号,证书等十种 十个全部一致才可以复用
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // host一样可以复用
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; 
    }
      ...... //省略部分
}

下面是建立一个socket连接:

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
      EventListener eventListener) {
   ......// 省略部分代码

    while (true) {
      try {
        //  使用了http代理去代理https
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            break;
          }
        } else {
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
        break;
      } catch (IOException e) {
        ...... //省略部分
       }
  }

private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket() // 使用http代理
        : new Socket(proxy); // 使用socket代理

    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
    // 会调用socket.connect()去建立真正的连接
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
......
    }
  }

CallServerInterceptor源码分析:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    // 写请求头
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    // 不是get请求且存在请求体
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 如果请求头包含Expect且对应的内容为100-continue
      // Expect:100-continue一般出现于大容量请求体或需要验证 代表先询问服务器是否愿意接受发送的数据
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        // 先读取响应头,如果响应头返回了100则返回一个为null的responseBuilder
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {

        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        // 再把请求体写出去
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();
    // 正常流程
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      // 读取服务器的响应
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

  ...... 
    return response;
  }

相关文章

网友评论

      本文标题:OkHttp源码分析

      本文链接:https://www.haomeiwen.com/subject/onuwuhtx.html