美文网首页
OkHttp源码分析

OkHttp源码分析

作者: 旺仔_100 | 来源:发表于2020-05-01 23:21 被阅读0次

一、OkHttp历史过程

OkHttp一开始是Square公司觉得原生的HttpURLConnection和阿帕奇的HttpClient不太好用,然后基于他们封装的,然后开源了,全世界都觉得好用。由于Google不建议使用HttpClient,所以Square一开始是把这一块独立出来了,后面又直接去掉了。后面他们一股作气,把HttpURLConnection也去掉了,这个就是完全不依赖Google原生,自己从实现了socket来进行网络连接。最后Google官方收录了OkHttp代码。实际上Android的HttpURLConnection的代码已经是OkHttp的代码了。这个时候是2013年,目前都是2020年了。

OkHttp4已经开始用Kotlin来重写了,所以kotlin对于安卓来说很重要,我们不是小孩子,我们不做选择,Flutter和kotlin我们都要。

https://square.github.io/okhttp/ 官网介绍

二、基本使用

 val url = "https://api.github.com/"
        val client = OkHttpClient()
        val request =  Request.Builder().url(url).build()
        client.newCall(request).enqueue(object : Callback{
            override fun onFailure(call: Call, e: IOException) {

            }

            override fun onResponse(call: Call, response: Response) {

            }

        })

三、源码分析,从基本使用的API开始入手 ,enqueue方法
调用的是RealCall的enqueue方法,源码如下(我贴出来的源码中,关键的地方都有注释,只看注释可以找到流程中的关键点,建议把源码翻出来对着看,流程并不太难的)

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

看到RealCall里面有OkHttpClient,OkHttpClient是啥,里面有请求的各种配置。还有Request,这个是个啥,就是我们要发起一个请求所有的东西都封装在这个里面了。forwebSockt:Boolean 是否启用webSockt,webSockt android端常用的长连接,可以随时接受服务端的推送。webSockt协议是在http协议之上建立的。

enqueue的跟踪:调用到RealCall的enqueue方法,

client.dispatcher().enqueue(new AsyncCall(responseCallback));

首先我们看下Dispatcher类

  private int maxRequests = 64;//默认允许的最大网络请求数
  private int maxRequestsPerHost = 5;//默认允许的最大主机数
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */  //异步请求缓存队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */   //异步请求正在运行的队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */  //同步请求正在运行队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

再来看看Dispatcher的enqueue方法源码

  void enqueue(AsyncCall call) {
    synchronized (this) {
      //把请求放到异步请求准备队列中
      readyAsyncCalls.add(call);
    }
    promoteAndExecute();
  }

接着看promoteAndExecute()

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      //遍历准备队列,如果没有超过最大请求数和最大主机数量,从异步准备队列移处,
      //然后添加到executableCalls和正在运行的异步队列中
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    //开始把刚才的请求遍历,然后执行
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //这里查看下怎么执行请求任务
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

查看asyncCall.executeOn(executorService());源码

   void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try { 
        //通过线程池来执行,this表示当前类实现了Runnable
       //当前类是AsyncCall final class AsyncCall extends NamedRunnable {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        eventListener.callFailed(RealCall.this, ioException);
        //responseCallback是我们传进来的回调,这个地方回掉onFailure
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

查看NamedRunnable源码

//果然实现了Runnable
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      // run 方法中调用了本类的抽象方法execute,所以我们在AsyncCall中查看execute()实现
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

查看AsyncCall中的execute实现

 @Override protected void execute() {
      boolean signalledCallback = false;
      timeout.enter();
      try {
        //通过这行代码去连接服务器并获取对应的响应
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          //我们的失败回调
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
         //我们的成功回调
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        //不管成功还是失败都会结束本次请求
        client.dispatcher().finished(this);
      }
    }
  }

四、拦截器

RetryAndFollowUpInterceptor 负责失败重试以及重定向

BridgeInterceptor 把客户端的请求转化为服务器需要的请求,把服务器返回的响应转化为

CacheInterceptor 负责读取缓存以及更新缓存

ConnectInterceptor 负责与服务器建立连接

CallServerInterceptor 负责从服务器读取响应数据

分析下责任链模式

首先是RealCall 的getResponseWithInterceptorChain()中

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

可以看到创建了一个RealInterceptor构造,其中index为0,然后chain.proceed(originalRequest) 这个会调用RealInterceptor的proceed(t)方法,我们看下源码

 public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

重点看下上面的注释 // Call the next interceptor in the chain. 处
首先会构造一个RealInterceptorChain,这个的index会+1,然后获取当前的interceptor,执行interceptor的intercept(chain)方法,找一下系统的RetryAndFollowUpInterceptor,看下他的intercept源码

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {     // ***********************************注释1***********************************
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getFirstConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp;
      try {
        followUp = followUpRequest(response, streamAllocation.route());
      } catch (IOException e) {
        streamAllocation.release();
        throw e;
      }

      if (followUp == null) {
        streamAllocation.release();
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }

找到代码 // 注释1
这个里面的chain是刚才index+1的RealInterceptorChain,又会重新调用之前的chain.proceed(originalRequest) ,只不过这是调用的第二个intercepter的intercept方法,就这样一直掉下去,直到最后一个CallServerInterceptor里面,这个里面是没有调用chain方法,所以会结束调用,返回response。所以我们自定义Intercept一般的都是会调用chain方法,否则OkHttp自带的intercepter就不会被调用到了。这个就是完整的拦截器责任链分发。

相关文章

网友评论

      本文标题:OkHttp源码分析

      本文链接:https://www.haomeiwen.com/subject/pdccghtx.html