美文网首页
Okhttp3 异步请求,源码浅析(二)

Okhttp3 异步请求,源码浅析(二)

作者: 徘徊0_ | 来源:发表于2020-05-08 10:07 被阅读0次
Okhttp流程.png

执行一次异步(开启工作线程,不会阻塞当前线程,结果通常采用接口回调方式回传)请求,大概需要经历如下:

        //1,构建 client
        OkHttpClient client = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS).build();//可以通过添加各种属性
        //2,构建request请求
        Request request = new Request.Builder().url("www.baidu.com").build();//可以继续添加各种数据
        //3,获得call对象
        Call call = client.newCall(request);
        //4,执行异步步请求
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                //子线程
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                //子线程
            }
        });

这里前三步,跟同步请求差不多,分析一下第四步,异步请求:
执行异步调用的是Call.enqueue()方法:

//RealCall.java  

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));//重点
  }

上面可以看出,executed保证了只能执行一次,enqueue(AsyncCall call)中需要一个AsyncCall,源码可以看出,AsyncCallRealCall的一个内部类,并且继承自NamedRunnable具体如下:

final class AsyncCall extends NamedRunnable {
    //.....省略部分代码

    //execute() 运行在run()方法中,从下面的NamedRunnable中可以看出
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();//拦截器链,后续具体分析
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);//将结果回调返回,responseCallback就是执行enqueue方法,传入的callback
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

这里可以大概猜出,它其实是一个Runnable,接着看一下NamedRunnable

public abstract class NamedRunnable implements Runnable {//实现了Runnable接口
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();//在run方法中,执行了抽象方法 execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();//抽象方法,具体在上面的 AsyncCall 实现
}

分析完 AsyncCall ,可以继续看client.dispatcher().enqueue(new AsyncCall(responseCallback)); 也就是 Dispatcher.enqueue() 方法

//Dispatcher.java
private int maxRequests = 64; //最大请求数量 64
private int maxRequestsPerHost = 5;

synchronized void enqueue(AsyncCall call) {
    // 下面逻辑,为了判断不能超过最大请求数量
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);//将本次请求call,添加到运行队列
      executorService().execute(call);//线程池,执行
    } else {
      readyAsyncCalls.add(call);// 添加到准备队列
    }
  }

Dispatcher内部维护了一个线程池来执行请求,上面代码executorService().execute(call);,也就是线程池执行了call:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

executorService().execute(call);其实也就是执行,run()方法,也就回到了上面NamedRunnablerun()方法,run()方法调用了AsyncCall.execute()

接着看一下,AsyncCall execute()-->client.dispatcher().finished(this);最后的时候,调用了Dispacher.finished()方法

//Dispatcher.java

/** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);//异步,这里三个参数为 true
  }


 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//将现在执行的call,从对应队列中删除
      if (promoteCalls) promoteCalls();  // promoteCalls = true ,调用  promoteCalls()
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

promoteCalls()方法:

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // 当前运行异步call队列大小,不能超过 maxRequests(maxRequests  = 64)
    if (readyAsyncCalls.isEmpty()) return; // 异步,准备队列如果没有数据,return

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {//遍历,异步准备Call队列
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);//将ready --> running状态
        executorService().execute(call);//线程池,继续执行队列中的call
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

RealCall # getResponseWithInterceptorChain() 拦截器链分析:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//自定义拦截器,定义在最初的地方
    interceptors.add(retryAndFollowUpInterceptor);//重试拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//桥接拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }
  • retryAndFollowUpInterceptor 失败重连拦截器,主要对网络请求出现异常的时候重连操作(最多重连20次),完成后,继续调用下个拦截器对response进行处理

  • BridgeInterceptor 桥接拦截器,其中主要是对 请求参数,如Header的处理、body中的类型Content-Type、长度 Content-LengthUser-Agent 等等的处理,在进行这些处理完成,也会去链式调用 chain.proceed(requestBuilder.build()); 继续递归处理剩余拦截器。

  • CacheInterceptor 缓存拦截器,主要对请求进行缓存处理,使用的时候,只需要在创建Client的时候,添加Cache类即可。

  • ConnectInterceptor 连接拦截器,底层通过TCP进行网络连接。

  • CallServerInterceptor 连接服务拦截器,也是最终的拦截器,内部没有chain.proceed 方法,通过 读写来进行Response的返回,将结果返回给前一个拦截器,依次进行下去。

    拦截器链工作流程.png

相关文章

网友评论

      本文标题:Okhttp3 异步请求,源码浅析(二)

      本文链接:https://www.haomeiwen.com/subject/moyarqtx.html