美文网首页
okhttp源码分析(一)——OkHttp的工作流程分析

okhttp源码分析(一)——OkHttp的工作流程分析

作者: 李die喋 | 来源:发表于2020-07-15 19:09 被阅读0次

如果我们想请求数据,使用少量的代码就可以实现:

OkHttpClient client = new OkHttpClient();
String url = "https://www.baidu.com/";
Request request = new Request().Builder()
                    .url(url)
                    .get()
                    .build();
Call call = client.newCall(request);
request.enqueue(new CallBack(){
    @Override
    public void onResponse(Call call,Response response) throws IOException{
        
    }
    
    @Override
    public void onFailure(Call call,IOException e){
        
    }
})

OkHttpClient类

创建OkHttpClient类的两种方式:

  • 直接创建对象 new OkHttpClient()
  • new OkHttpClient.Builder().build()

OkHttpClient对象源码:

public OkHttpClient() {
this(new Builder());
}

OkHttpClient(Builder builder) {
//调度器,用于控制并发的请求。内部保存同步和异步请求的call,并使用线程池处理异步请求。
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;//代理设置
this.protocols = builder.protocols;//默认支持http协议版本
this.connectionSpecs = builder.connectionSpecs;//okhttp连接 connection配置
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;//一个Call的状态监听器
this.proxySelector = builder.proxySelector;//使用默认的代理选择器
this.cookieJar = builder.cookieJar;//默认是没有cookie的
this.cache = builder.cache;//缓存
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;//使用默认的Scoket工厂产生Socket

boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
  isTLS = isTLS || spec.isTls();
}

if (builder.sslSocketFactory != null || !isTLS) {
  this.sslSocketFactory = builder.sslSocketFactory;
  this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
  X509TrustManager trustManager = Util.platformTrustManager();
  this.sslSocketFactory = newSslSocketFactory(trustManager);
  this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}

if (sslSocketFactory != null) {
  Platform.get().configureSslSocketFactory(sslSocketFactory);
}

this.hostnameVerifier = builder.hostnameVerifier;//安全相关的设置
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
    certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;//连接池
this.dns = builder.dns;//域名解析系统 domain name->ip address
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;//这个和websocket相关,为了保持长连接,我们必须每间隔一段时间放松一个ping指令

if (interceptors.contains(null)) {
  throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
  throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}

Call类

在定义了请求对象后,需要生成一个Call对象。该对象代表一个准备被执行的请求。Call是可以被取消的,Call表示单个请求/响应对流,不能执行两次。

public interface Call extends Cloneable {
    
    Request request();
    
    Response execute() throws IOException;
    
    void enqueue(Callback responseCallback);
    
    void cancel();
    
    boolean isExecuted();

    boolean isCanceled();
    
    Timeout timeout();
    
    Call clone();

    interface Factory {
        Call newCall(Request request);
    }
}
  • 进入OkHttpClient的newCall方法
public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}
  • newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
}

newCall方法获得Call实际是RealCall,RealCall就是准备执行的请求,是对接口Call的实现,其内部持有OkHttpClient实例,Request实例。并且这里还创建了Transmitter给RealCall的transmitter赋值。

Transmitter类

Transmitter意为发射器,是应用层和网络层的桥梁。在进行连接、真正发出请求和读取响应中起到很重要的作用。

public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}

Transmitter内部持有OkHttpClient、连接池、call、事件监听器。

Dispatcher类

Dispatcher类负责异步任务的请求策略。

public final class Dispatcher {
  private int maxRequests = 64;
  //每个主机的最大请求数,如果超过这个数,新的请求会被加到readyAsyncCalls队列中
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;
  
  //任务队列线程池
  private @Nullable ExecutorService executorService;
  //待执行异步任务队例
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //运行中的异步任务队例
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //运行中同步任务队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
}

同步请求执行流程

client.newCall(request).execute(),execute方法在Call的实现类RealCall中。

public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();//超时计时开始
    transmitter.callStart();//回调监听器的请求开始
    try {
      client.dispatcher().executed(this);//放入队列
      return getResponseWithInterceptorChain();//执行请求获取结果
    } finally {
      client.dispatcher().finished(this);//请求结束
    }
}

首先判断 如果已经执行,就会抛出异常。这就是一个请求只能执行一次的原因。然后回调请求监听器的请求开始。然后调用client的调度器Dispatcher的executed方法。

  • dispatcher().executed()
synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

请求放入一个双端队列runningSyncCalls中,表示正在执行的同步请求。

然后返回了getResponseWithInterceptorChain()的结果Response,同步请求真正的请求流程是在getResponseWithInterceptorChain方法中(详情见下节)。
最后请求结束,会走Dispatcher的finished(Deque calls, T call)方法。

  • dispatcher.finished()
void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }
    
    boolean isRunning = promoteAndExecute();
    
    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
}

这里将call从同步队列中移除,并且调用了promoteAndExecute()方法,这个方法在后面讲述。

异步请求执行流程

  • 异步方法equeue()
@Override 
public void enqueue(Callback responseCallback) {
synchronized (this) {
  //设置exexuted参数为true,表示不可以执行两次
 if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
}
transmitter.callStart();//回调请求监听器的请求开始
//传入一个新的对象AsyncCall
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
  • AsyncCall类
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
  super("OkHttp %s", redactedUrl());
  this.responseCallback = responseCallback;
}

String host() {
  return originalRequest.url().host();
}

Request request() {
  return originalRequest;
}

RealCall get() {
  return RealCall.this;
}

@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    //执行耗时的IO操作
    //获取拦截器链,详见下篇文章
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
          
    //回调,注意这里回调是在线程池中,而不是向当前的主线程回调
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      //回调,同上
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      eventListener.callFailed(RealCall.this, e);
      //回调,同上
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    
    client.dispatcher().finished(this);
  }
}
}

AsyncCall继承NamedRunnable,NamedRunnable实现自Runnable,即AsyncCall就是个Runnable,它是会在线程或线程池中执行run方法的。

  • NamedRunnable类
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //execute抽象方法,在AsyncCall中有具体实现
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

在分析同步的时候知道Dispatcher调度器负责异步请求策略,去看看equeue方法。

  • Dispatcher.equeue()
void enqueue(AsyncCall call) {
synchronized (this) {
      readyAsyncCalls.add(call);
    
      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      //相同host请求,共用一个调用技术
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
}

//从runningAsyncCalls和readyAsyncCalls找到相同的host请求
private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
}
  • promoteAndExecute()
//调度的核心方法:在控制异步并发的策略基础上,使用线程池 执行异步请求
private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
    
        if (runningAsyncCalls.size() >= maxRequests) break; //最大并发数64.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host最大并发数.
    
        i.remove();//从等待队列中移除
        //host并发数+1
        asyncCall.callsPerHost().incrementAndGet();
        //加入可执行请求的集合
        executableCalls.add(asyncCall);
        //加入正在执行的异步请求队列
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //可执行的请求
      asyncCall.executeOn(executorService());
    }
    
    return isRunning;
}

public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
}

遍历readyAsyncCalls,先进行两个检查:

  1. 正在执行异步请求runningAsyncCalls数量大于最大并发请求数64就break;
  2. 相同host请求的数量大于5,就continue。

如果检查都通过,就从等待队列中移除,callPerHost自增1,放入可执行的集合executableCalls,并添加到队列runningAsyncCalls中,表示正在执行的异步请求。

这里的异步请求等待队列,是为了控制最大并发数的缓冲,异步请求并发数达到64、相同host的异步请求达到5,都要放入等待队列。

  • AsyncCall.executeOn()
void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    //在线程池中执行asyncCall
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);//回调失败
  } finally {
    if (!success) {
      client.dispatcher().finished(this); //执行发生异常 结束
    }
  }
}

AsyncCall的run方法会走到execute()方法,在上面有展示。

下面总结一下请求的流程:

  • 同步请求
  1. 调用client.newCall(request).execute()方法,也就是RealCall的execute方法;
  2. execute方法内部调用client.dispatcher().executed()方法,将当前RealCall加入到runningSyncCalls队列;
  3. 使用getResponseWithInterceptorChain()获取结果;
  4. 最后调用Dispatcher的finish方法结束请求。
  • 异步请求
  1. 调用client.newCall(request).equeue()方法,其内部调用client.dispatcher().enqueue(new AsyncCall(responseCallback))方法;
  2. 先将AsyncCall加入到当前readyAsyncCalls队列中,在找到执行当前主机的AsyncCall,一个主机用同一个AsyncCall;
  3. 使用promoteAndExecute()方法在控制异步并发的策略基础上使用线程池执行异步请求(并发控制有包括最大并发数64,host最大并发数5)。异步请求的执行也是使用getResponseWithInterceptorChain(),获得结果后回调出去。最后调用Dispatcher的finish方法结束请求。

相关文章

网友评论

      本文标题:okhttp源码分析(一)——OkHttp的工作流程分析

      本文链接:https://www.haomeiwen.com/subject/hdbjhktx.html