源码分析基于 3.14.4
关键字:OkHttp高并发线程池
OkHttp的线程池是怎么样的?
https://www.jianshu.com/p/3a1d5389b36a在这篇文章有提到,这个线程池很有意思:核心线程数为0,SynchronousQueue是一个无容量的集合,最大线程数为Integer.MAX_VALUE(但是OkHttp自己控制了请求最大并发数,所以不会达到这个值)。
public final class Dispatcher {
......
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
如何达到高并发的呢?
简单说下线程池执行流程:
(1)当线程数小于核心线程数,则创建线程执行该任务,任务执行完会取队列中的任务执行;
(2)当线程数大于核心线程数,则将任务添加到队列当中,如果队列没有满则添加成功;
(3)当队列满则添加失败,且线程数小于最大线程数,则创建线程执行该任务,,任务执行完会取队列中的任务执行;
(4)上述(3)无法创建线程执行任务,则执行抛弃策略;
具体细节可以看下https://www.jianshu.com/p/95bd9bfbe04d。
我们来分析下OkHttp的线程池,假设当前线程池没有任务下:
(1)当往线程池添加任务,当前线程数>=核心线程数(0>=0),则将任务添加到队列中;
(2)因为SynchronousQueue是无容量的,所以任务添加失败(有一种情况是会成功的,后面会介绍);
(3)当前线程数小于Integer.MAX_VALUE(一般也很难到达这个值),所以创建线程马上执行该任务;
因为SynchronousQueue是无容量,而且最大线程数为Integer.MAX_VALUE,也不会出现任务需要等待或者被抛弃的情况,只要任务到达线程池,就会马上被执行,也就是高并发。
最大线程数为Integer.MAX_VALUE,会不会撑爆虚拟机?
答案是不会。上面有说到,因为OkHttp自己控制了请求最大并发数,所以不会达到这个值。
为什么核心线程数为0?
是为了让所有线程可以timeout,默认情况下,核心线程是不会timeout的,除非线程池shutdown了。不过后来我想了想,通过allowCoreThreadTimeOut可以让核心线程也timeout,这样核心线程数就可以设置大于0,不过这时候就得考虑,核心线程数设置多少才合适呢,所以还是直接设置0比较简单;
为什么使用SynchronousQueue?
常用的BlockingQueue有ArrayBlockingQueue、LinkedBlockingQueue;
ArrayBlockingQueue跟LinkedBlockingQueue一样,必须设置大于0的容量,所以必须会导致任务不能马上被执行,只有SynchronousQueue适用;
思考题
(1)test1()、test2()执行有什么不同?
test1()、test2()分别创建线程池来执行两个打印任务,不同的是,前者线程池核心为0,后者核心数为1,test2()会先打印run1再打印run2,这应该是无疑,那么test1()呢?
答案是跟test2()一样,估计很多人会有疑问,核心数为0,打印run1的任务不应该先放到队列中,而打印run2的任务无法放入队列且没超过最大线程数,则会创建线程执行吗?
大体方向没有错,但是有一个关键点,打印run1的任务放到队列后,会接着判断线程池线程是否为0,是则创建线程执行队列任务,所以打印run1的任务就会被先执行。
public void test1() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1));
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run1");
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run2");
}
});
}
public void test2() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1));
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run1");
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run2");
}
});
}
ThreadPoolExecutor.execute
以下是添加到队列的逻辑,workerCountOf(recheck) == 0条件成立,则addWorker创建线程执行队列任务;
public void execute(Runnable command) {
......
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
(2)以下代码会出现什么问题?
test1()逻辑跟上述差不多,只不过改成死循环打印run1,那么打印run2会被执行吗?
答案是不会。因为打印run2的任务被放入队列以后,线程池有一个线程在工作,就不会创建线程来执行打印run2,必须等待打印run1的任务执行完。
public void test1() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1));
executorService.execute(new Runnable() {
@Override
public void run() {
while(true){
System.out.println("run1");
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run2");
}
});
}
(3)SynchronousQueue.offer一定返回false吗?
答案:不是。
情景1:
以下代码,因为SynchronousQueue是无容量的,所以offer一定返回false,从日志上也可以看出;
public void test1() {
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread() {
@Override
public void run() {
boolean isSuccess = queue.offer("11111");
System.out.println("isSuccess=" + isSuccess);
}
}.start();
}
输出日志
isSuccess=false
请求2:
以下代码,SynchronousQueue.offer会返回true,因为在offer之前,线程1执行了take任务,并且处于阻塞等待,当有任务过来马上返回并且offer返回true;
public void test1() {
SynchronousQueue<String> queue = new SynchronousQueue<>();
//线程1,take任务
new Thread() {
@Override
public void run() {
try {
System.out.println("thread1");
String runnable = queue.take();//堵塞,等待有任务
System.out.println("runnable=" + runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
//主线程休眠100毫秒,为了保证线程1先执行
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程2,offer任务
new Thread() {
@Override
public void run() {
System.out.println("thread2");
boolean isSuccess = queue.offer("123");
System.out.println("isSuccess=" + isSuccess);
}
}.start();
}
输出日志:
thread1
thread2
runnable=123
isSuccess=true
总结
(1)OkHttp利用这三个参数:核心线程数为0,SynchronousQueue是一个无容量的集合,最大线程数为Integer.MAX_VALUE,创建的线程池实现高并发;
(2)线程池核心线程数有时候设置0或1,效果是一样的;
(3)SynchronousQueue虽然是无容量的,但是offer不一定返回false,如果在offer之前,有线程处于take 阻塞状态,那么offer就返回true;
以上分析有不对的地方,请指出,互相学习,谢谢哦!
网友评论