美文网首页
Android-OkHttp3-分发器和线程池配置

Android-OkHttp3-分发器和线程池配置

作者: zzq_nene | 来源:发表于2020-07-21 16:36 被阅读0次

一、OkHttp3的简单实用

// 使用OkHttp至少需要4个类
// 1.OkHttpClient
// 2.Request
// 3.Call->一般是用RealCall
// 4.Response

OkHttpClient client = new OkHttpClient();
void get(String url) throws IOException {
    Request request = new Request.Builder()
            .url(url)
            .build();
    Call call = client.newCall(request);
    Response response = call.execute();
    
    // 获得响应
    ResponseBody body = response.body();
    System.out.println(body.string());
}

void post(String url) throws IOException {
    RequestBody requestBody = new FormBody.Builder().add("city", "北京")
            .add("key", "11111")
            .build();
    Request request = new Request.Builder()
            .url(url)
            .post(requestBody)
            .build();
    
    // 执行网络同步请求
    Call call = client.newCall(request);
    Response response = call.execute();
    // 获得响应
    ResponseBody body = response.body();
    System.out.println(body.string());
}

二、OkHttp3中的线程池配置

OkHttp中的线程池是定义在分发器中的,即定义在Dispatcher

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}
public static ThreadFactory threadFactory(String name, boolean daemon) {
  return runnable -> {
    Thread result = new Thread(runnable, name);
    result.setDaemon(daemon);
    return result;
  };
}

高并发,最大吞吐量。SynchronousQueue队列是无容量队列,
在OkHttp中,配置的线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,线程的存活时间为60s,采用的队列是SynchronousQueue。

一般线程池

一般的线程池,当加入新任务的时候,首先会进入核心线程,判断当核心是否达到最大值,如果没有,则由核心线程直接执行;如果核心线程数达到最大值,那么判断是否有空闲线程,这些空闲线程是由最大线程数中已经创建的,但是没有任务在执行,则交由空闲线程执行任务,如果没有空闲线程,则任务进入阻塞队列;当阻塞队列满的时候,再有新的任务加入,则判断当前线程数是否小于最大线程数,如果是小于,则创建新的线程,然后执行新加入的任务。

OkHttp线程池

但是OkHttp3中的线程池,核心线程数为0,而最大线程数为Integer.MAX_VALUE,而使用的是SynchronousQueue队列,这个队列是无容量队列,那么往这个队列中去添加任务,那么一定会添加失败,那么就会判断当前线程数是否小于最大线程数还是大于等于最大线程数,那么这里一定是小于,所以会创建新的线程,然后执行任务;但是如果在加入阻塞队列之前,有空闲线程,则优先交由空闲线程执行,因为线程在执行完成之后,依然会存活60s才会销毁。

三、OkHttp的调用流程

OkHttp使用流程.png

OkHttp的所有的逻辑大部分都是集中在拦截器中,但是在进入拦截器之前还需要依靠分发器来调配请求任务。
分发器:内部维护队列与线程池,完成请求调配;(Dispatcher)
拦截器:五大默认拦截器完成整个请求过程。(Interceptors)
execute是同步请求,enqueue是异步请求,请求之后就会把请求交给Dispatcher,然后Dispatcher分发请求交给拦截器,经过5个拦截器之后,得到Response

四、Dispatcher分发器(主要依赖于3.14的源码)

首先,先看一张基于3.10源码的分发器异步工作的流程图:


Dispatcher3.10异步工作流程.png

这是3.10源码的Dispatcher的异步工作流程,而在3.14中,这部分略有点区别,在3.14中,是先将请求直接放入ready队列中,然后在执行的时候,将请求从ready队列中取出,加入到running队列中。

OkHttp使用的流程,创建Request,然后通过OkHttpClient.newCall方法获取对应的Call接口实现,再调用Call的同步或者异步请求方法,这里因为是看异步的,所以直接看enqueue方法。

1.OkHttpClient.newCall

@Override public Call newCall(Request request) {
  return RealCall.newRealCall(this, request, false /* for web socket */);
}

OkHttpClient创建的Call实现,其实就是创建了RealCall实例,通过调用RealCall.newRealCall方法创建对应的RealCall实例

2.RealCall.newRealCall

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // Safely publish the Call instance to the EventListener.
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.transmitter = new Transmitter(client, call);
  return call;
}

在获取到Call实现对象之后,然后调用Call的execute()或者enqueue()进行同步或者异步请求。

3.RealCall.enqueue

执行异步请求

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.callStart();
  // RealCall.enqueue异步请求,调用OkHttpClient中的dispatcher分发器
  // 通过分发器的enqueue方法,分发一个封装了的Callback的异步Call
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

调用分发器Dispatcher.enqueue方法,将Callback封装成一个AsyncCall实例对象。
在分发器中,异步请求有两个队列,一个是readyAsyncCalls准备队列,一个是runningAsyncCalls执行队列。放在ready准备队列中,只是在ready队列中保存,并不会做请求;如果是放在runningAsyncCalls执行队列中,则会把AsyncCall提交到线程池当中去,AsyncCall其实是实现了Runnable接口的一个Task,能够被线程执行的Runnable实现任务。线程池中,接收到任务,就会去执行请求任务接口,当任务完成之后,又会调用分发器中的finished接口,然后Dispatcher又会调用promoteCalls方法对ready队列执行一个遍历,如果ready队列中的任务满足条件,那么就会移动到running队列中被执行。

ready队列和running队列,是两个双向队列

4.Dispatcher.enqueue(AsyncCall call)

在这个方法中,使用3.14和3.10的源码做一个对比

OkHttp3.14的源码:
void enqueue(AsyncCall call) {
  synchronized (this) {
    readyAsyncCalls.add(call);

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) {
       // 这一步是从running队列和ready队列中找出已经存在的AsyncCall
      // 如果这里不为null,则修改新的AtomicInteger
      // 使其与同一主机共享现有正在运行的调用的AtomicInteger
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  promoteAndExecute();
}
// 遍历准备队列
private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));

  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();

      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

      i.remove();
      // 对同一个域名的主机请求个数+1
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }

  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    // 执行AsyncCall.executeOn方法,其实就是调用线程池的execute方法
    // 通过线程池中的线程执行AsyncCall任务,即执行AsyncCall中的execute()方法
    asyncCall.executeOn(executorService());
  }

  return isRunning;
}

OkHttp3.10的源码
synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
        runningAsyncCalls.add(call);
        executorService().execute(call);
    } else {
        readyAsyncCalls.add(call);
    }
}
3.10和3.14源码中,针对AsyncCall加入到ready和running队列中的做法的区别:

对比3.10和3.14的源码:在3.14源码中,会先将AsyncCall加入ready准备队列中,而不会出现直接加入到running执行队列中的情况,并且判断是否是webSocket,如果不是,则找出双向队列中已经存在的对应的host相同的call,并且修改已经存在的AsyncCall的AtomicInteger,用以记录当前同一个host有多少个请求存在;接着调用promoteAndExecute()方法对准备队列进行遍历,在遍历的过程中,会判断running队列中的请求数是否大于等于最大请求数,即maxRequests=64,如果大于等于64,则break,不加入到running队列中,如果是小于则执行下一个判断;判断ready队列中当前遍历到的AsyncCall对同一域名主机的请求数是否大于等于5个,如果是大于等于5个,则也不加入到请求队列中;那么如果running队列中正在请求数小于64个,并且对同一域名主机的请求小于5个,则从ready出队,加入到running队列中。
所以想要从ready中将请求加入到running队列中,需要满足两个条件:即running队列中的正在请求数小于64,并且对同一域名主机的请求数小于5个,则加入running队列中。
而3.10的源码,则是判断满足这两个条件之后,直接加入到running队列中,而不是优先加入到ready队列中。

3.10和3.14源码中,对AsyncCall任务执行的区别:

加入到running队列后,则通过线程池执行任务。在3.14的源码中,执行任务是通过遍历一个List集合executableCalls,因为3.14源码中,是先将请求队列加入到ready队列中,然后再遍历放到List集合executableCalls和running队列中,所以在执行的时候,是需要对executableCalls这个List集合进行遍历,调用AsyncCall.executeOn(executorService())方法,即遍历List集合拿到AsyncCall对象,然后调用该对象的executeOn方法。
而3.10的源码,是将AsyncCall加入到running队列中以后,直接调用executorService().execute(call);方法直接执行任务。
3.14源码这样做,是因为加入新的AsyncCall的时候,ready队列中依然有AsyncCall存在,那么就有可能一次性从ready队列中加入多个AsyncCall到running队列中,所以遍历集合。而3.10的做法,是在新的AsyncCall进入以后,如果是被允许加入running队列中,则直接执行,但是这个时候并没有考虑ready中可能存在能被执行的AsyncCall。

running队列需要满足的条件:maxRequests和maxRequestsPerHost

maxRequests:说明最大请求并发,即正在请求的数量是有限制的,默认最大是64
maxRequestsPerHost:同一主机同一域名正在请求的个数也是有限制的,默认最大是5
这默认是5个,如果同一个手机对同一个服务器的请求个数太多,对服务器的压力就更大,所以这里设置默认5个。这是采用一个volatile修饰的AtomicInteger来计数。

5.AsyncCall详解

final class AsyncCall extends NamedRunnable {
  private final Callback responseCallback;
  private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

  AsyncCall(Callback responseCallback) {
    super("OkHttp %s", redactedUrl());
    this.responseCallback = responseCallback;
  }

  AtomicInteger callsPerHost() {
    return callsPerHost;
  }

  void reuseCallsPerHostFrom(AsyncCall other) {
    this.callsPerHost = other.callsPerHost;
  }

  String host() {
    return originalRequest.url().host();
  }

  Request request() {
    return originalRequest;
  }

  RealCall get() {
    return RealCall.this;
  }

  /**
   * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
   * if the executor has been shut down by reporting the call as failed.
   * 3.14:异步执行是,用于接收线程池,然后执行AsyncCall本身。
   */
  void executeOn(ExecutorService executorService) {
    assert (!Thread.holdsLock(client.dispatcher()));
    boolean success = false;
    try {
      executorService.execute(this);
      success = true;
    } catch (RejectedExecutionException e) {
      InterruptedIOException ioException = new InterruptedIOException("executor rejected");
      ioException.initCause(e);
      transmitter.noMoreExchanges(ioException);
      responseCallback.onFailure(RealCall.this, ioException);
    } finally {
      if (!success) {
        client.dispatcher().finished(this); // This call is no longer running!
      }
    }
  }

  // 这个方法其实就是最终在线程池中的线程中执行的run()方法中调用的
  // 所以当AsyncCall任务被执行的时候,最终就是执行execute()方法
  @Override protected void execute() {
    boolean signalledCallback = false;
    transmitter.timeoutEnter();
    try {
      // 通过拦截器,获取得到最终的请求结果
      Response response = getResponseWithInterceptorChain();
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);
    } catch (IOException e) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
      } else {
        responseCallback.onFailure(RealCall.this, e);
      }
    } catch (Throwable t) {
      cancel();
      if (!signalledCallback) {
        IOException canceledException = new IOException("canceled due to " + t);
        canceledException.addSuppressed(t);
        responseCallback.onFailure(RealCall.this, canceledException);
      }
      throw t;
    } finally {
      client.dispatcher().finished(this);
    }
  }
}

因为AsyncCall继承自NamedRunnable,而NamedRunnable其实就是实现了Runnable接口,并且在NamedRunnable的run方法中,其实就是调用了NamedRunnable的execute()抽象方法,所以AsyncCall需要实现execute()方法,而线程执行其实就是执行run方法,所以就是执行了AsyncCall.execute()方法,当AsyncCall调用executeOn方法的时候,其内部就会调用executorService.execute(this);执行当前的AsyncCall,这样就会执行AsyncCall.execute()方法。
当AsyncCall.execute()执行完成之后,就会调用OkHttpClient.dispatcher().finished(this);方法,其实就是调用了Dispatcher.finished方法,目的就是当前running队列中的一个任务已经执行结束的时候,就会再次去从ready队列中寻找可以被执行的任务,加入到running队列中。
这里的Dispatcher.finished方法,不管当前任务执行是否是成功还是失败,都会执行。

6.Dispatcher.finished(AsyncCall call)

void finished(AsyncCall call) {
  // 调整同一个域名的主机的请求个数,减少一
  // 因为当前的请求已经执行结束,不管是成功还是失败
  call.callsPerHost().decrementAndGet();
  finished(runningAsyncCalls, call);
}

private <T> void finished(Deque<T> calls, T call) {
  Runnable idleCallback;
  synchronized (this) {
    // 从running中移除AsyncCall任务
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    idleCallback = this.idleCallback;
  }

  // 移动队列,其实就是之前第一次过程中
  // 调用Dispatcher.enqueue方法的过程中,加入到ready队列以后执行的方法
  // 即判断AsyncCall任务是否可以从ready队列中移动到running队列中
  // 又是判断maxRequests和maxRequestsPerHost这两个条件
  boolean isRunning = promoteAndExecute();

  if (!isRunning && idleCallback != null) {
    idleCallback.run();
  }
}

在Dispatcher.finished方法中,又会再一次执行promoteAndExecute()方法,该方法其实就是遍历ready队列,查询可以执行的AsyncCall加入到running队列中,然后调用这些AsyncCall的executeOn方法执行任务。

总结:

3.14的源码中,分发器的异步执行过程,其实就是先将AsyncCall这个Runnable任务添加到ready准备队列中,然后遍历准备队列,接着判断running执行队列正在请求数是否小于64,以及对同一个域名主机的请求数是否小于5,如果满足,则将AsyncCall任务添加到running执行队列中,接着遍历这个时候加入到running执行队列的AsyncCall,对这些AsyncCall任务,分别调用线程池执行AsyncCall任务,而执行任务,其实就是执行AsyncCall的run方法,而AsyncCall的run方法其实就是执行了AsyncCall的execute方法,当一个任务执行完成的时候,即execute中最后会调用Dispatcher.finished()方法,在这里,就会继续遍历准备队列,将准备队列中的AsyncCall任务做再一次的条件判断,判断是否允许加入到running执行队列中,然后再一次执行这些新加入到running执行队列的任务。
在这个过程中,入队出队的过程,基本都是采用synchronized同步代码块的方式在分发器中进行。
而最终的请求,其实就是在线程中执行AsyncCall任务的时候,调用的execute方法中调用RealCall.getResponseWithInterceptorChain()方法,通过n个拦截器,得到最终的Response。

五、Dispatcher的同步执行流程

上面的流程,其实都是异步请求的流程,而同步请求,其实分发器就做一件事情,就是将RealCall添加到running执行队列中。

RealCall.execute()

@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.timeoutEnter();
  transmitter.callStart();
  try {
    client.dispatcher().executed(this);
    return getResponseWithInterceptorChain();
  } finally {
    client.dispatcher().finished(this);
  }
}

Dispatcher.executed()

synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

添加到running执行队列之后,就直接调用RealCall.getResponseWithInterceptorChain()执行请求。

参考:

https://blog.csdn.net/json_it/article/details/78404010
https://blog.csdn.net/hello2mao/article/details/53159151

相关文章

  • Android-OkHttp3-分发器和线程池配置

    一、OkHttp3的简单实用 二、OkHttp3中的线程池配置 OkHttp中的线程池是定义在分发器中的,即定义在...

  • OkHttp工作流程

    分发器 线程池 拦截器 五大拦截器 分发器 Dispatcher类,这个类的作用用于分发提交的网络任务,高并发任务...

  • OkHttp的分发器线程池

    在 OkHttp 的分发器中的线程池定义如上,其实就和 Executors.newCachedThreadPool...

  • Springboot | 线程池的学习,多线程池配置示例

    一、线程和进程,线程的生命周期二、单线程和多线程三、线程池的概念四、线程池的使用五、多线程池配置示例 一、线程和进...

  • 自定义注解实现一个可配置线程池

    前言 PoolConfig(线程池核心配置参数): ThreadPoolConfig(线程池配置 yml/pope...

  • OkHttp源码解析

    OkHttp源码分析 本文基于OkHttp 3.10.0版本分析。 OkHttp请求流程 高并发请求分发器与线程池...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • spring boot线程池使用

    线程池配置 线程池方法定义 单测调用

  • okhttp的原理

    基本流程:1 任务分发 2 线程池管理 3 拦截器 4 缓存策略 5 连接池 6 数据流请求流程:1 创建clie...

  • Android 线程池原理

    线程池核心类 : ThreadPoolExecutor:提供了一系列参数来配置线程池 线程池优点: 1.重用线程池...

网友评论

      本文标题:Android-OkHttp3-分发器和线程池配置

      本文链接:https://www.haomeiwen.com/subject/frjmzhtx.html