美文网首页
源码分析->撕开OkHttp(10)高并发线程池

源码分析->撕开OkHttp(10)高并发线程池

作者: 杨0612 | 来源:发表于2020-11-10 11:10 被阅读0次
    源码分析基于 3.14.4
    关键字:OkHttp高并发线程池
    OkHttp的线程池是怎么样的?

    https://www.jianshu.com/p/3a1d5389b36a在这篇文章有提到,这个线程池很有意思:核心线程数为0,SynchronousQueue是一个无容量的集合,最大线程数为Integer.MAX_VALUE(但是OkHttp自己控制了请求最大并发数,所以不会达到这个值)。

    public final class Dispatcher {
      ......
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    
    如何达到高并发的呢?

    简单说下线程池执行流程:
    (1)当线程数小于核心线程数,则创建线程执行该任务,任务执行完会取队列中的任务执行;
    (2)当线程数大于核心线程数,则将任务添加到队列当中,如果队列没有满则添加成功;
    (3)当队列满则添加失败,且线程数小于最大线程数,则创建线程执行该任务,,任务执行完会取队列中的任务执行;
    (4)上述(3)无法创建线程执行任务,则执行抛弃策略;
    具体细节可以看下https://www.jianshu.com/p/95bd9bfbe04d

    我们来分析下OkHttp的线程池,假设当前线程池没有任务下:
    (1)当往线程池添加任务,当前线程数>=核心线程数(0>=0),则将任务添加到队列中;
    (2)因为SynchronousQueue是无容量的,所以任务添加失败(有一种情况是会成功的,后面会介绍);
    (3)当前线程数小于Integer.MAX_VALUE(一般也很难到达这个值),所以创建线程马上执行该任务;
    因为SynchronousQueue是无容量,而且最大线程数为Integer.MAX_VALUE,也不会出现任务需要等待或者被抛弃的情况,只要任务到达线程池,就会马上被执行,也就是高并发。

    最大线程数为Integer.MAX_VALUE,会不会撑爆虚拟机?

    答案是不会。上面有说到,因为OkHttp自己控制了请求最大并发数,所以不会达到这个值。

    为什么核心线程数为0?

    是为了让所有线程可以timeout,默认情况下,核心线程是不会timeout的,除非线程池shutdown了。不过后来我想了想,通过allowCoreThreadTimeOut可以让核心线程也timeout,这样核心线程数就可以设置大于0,不过这时候就得考虑,核心线程数设置多少才合适呢,所以还是直接设置0比较简单;

    为什么使用SynchronousQueue?

    常用的BlockingQueue有ArrayBlockingQueue、LinkedBlockingQueue;
    ArrayBlockingQueue跟LinkedBlockingQueue一样,必须设置大于0的容量,所以必须会导致任务不能马上被执行,只有SynchronousQueue适用;

    思考题
    (1)test1()、test2()执行有什么不同?

    test1()、test2()分别创建线程池来执行两个打印任务,不同的是,前者线程池核心为0,后者核心数为1,test2()会先打印run1再打印run2,这应该是无疑,那么test1()呢?
    答案是跟test2()一样,估计很多人会有疑问,核心数为0,打印run1的任务不应该先放到队列中,而打印run2的任务无法放入队列且没超过最大线程数,则会创建线程执行吗?
    大体方向没有错,但是有一个关键点,打印run1的任务放到队列后,会接着判断线程池线程是否为0,是则创建线程执行队列任务,所以打印run1的任务就会被先执行。

        public void test1() {
            ThreadPoolExecutor executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1));
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run1");
                }
            });
    
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run2");
                }
            });
        }
    
        public void test2() {
            ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1));
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run1");
                }
            });
    
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run2");
                }
            });
        }
    
    ThreadPoolExecutor.execute

    以下是添加到队列的逻辑,workerCountOf(recheck) == 0条件成立,则addWorker创建线程执行队列任务;

      public void execute(Runnable command) {
          ......
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    
    (2)以下代码会出现什么问题?

    test1()逻辑跟上述差不多,只不过改成死循环打印run1,那么打印run2会被执行吗?
    答案是不会。因为打印run2的任务被放入队列以后,线程池有一个线程在工作,就不会创建线程来执行打印run2,必须等待打印run1的任务执行完。

     public void test1() {
            ThreadPoolExecutor executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1));
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while(true){
                        System.out.println("run1");
                    }
                }
            });
    
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run2");
                }
            });
        }
    
    (3)SynchronousQueue.offer一定返回false吗?

    答案:不是。
    情景1:
    以下代码,因为SynchronousQueue是无容量的,所以offer一定返回false,从日志上也可以看出;

        public void test1() {
            SynchronousQueue<String> queue = new SynchronousQueue<>();
            new Thread() {
                @Override
                public void run() {
                    boolean isSuccess = queue.offer("11111");
                    System.out.println("isSuccess=" + isSuccess);
                }
            }.start();
        }
    
    输出日志
    isSuccess=false
    

    请求2:
    以下代码,SynchronousQueue.offer会返回true,因为在offer之前,线程1执行了take任务,并且处于阻塞等待,当有任务过来马上返回并且offer返回true;

     public void test1() {
            SynchronousQueue<String> queue = new SynchronousQueue<>();
            //线程1,take任务
            new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println("thread1");
                        String runnable = queue.take();//堵塞,等待有任务
                        System.out.println("runnable=" + runnable);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            }.start();
            //主线程休眠100毫秒,为了保证线程1先执行
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //线程2,offer任务
            new Thread() {
                @Override
                public void run() {
                    System.out.println("thread2");
                    boolean isSuccess = queue.offer("123");
                    System.out.println("isSuccess=" + isSuccess);
                }
            }.start();
        }
    
    输出日志:
    thread1
    thread2
    runnable=123
    isSuccess=true
    
    总结

    (1)OkHttp利用这三个参数:核心线程数为0,SynchronousQueue是一个无容量的集合,最大线程数为Integer.MAX_VALUE,创建的线程池实现高并发;
    (2)线程池核心线程数有时候设置0或1,效果是一样的;
    (3)SynchronousQueue虽然是无容量的,但是offer不一定返回false,如果在offer之前,有线程处于take 阻塞状态,那么offer就返回true;

    以上分析有不对的地方,请指出,互相学习,谢谢哦!

    相关文章

      网友评论

          本文标题:源码分析->撕开OkHttp(10)高并发线程池

          本文链接:https://www.haomeiwen.com/subject/sbcrbktx.html