美文网首页
Android - 剖析OKHttp(2)- 调度请求

Android - 剖析OKHttp(2)- 调度请求

作者: 杨0612 | 来源:发表于2020-11-05 21:55 被阅读0次

    https://juejin.cn/post/7095315542202351623
    这篇文章分析了,Request如何构建以及加入调度队列的。

    源码分析基于 3.14.4

    发起请求

    //RealCall类中
    @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }    
        client.dispatcher().enqueue(new AsyncCall(responseCallback));//1
      }
    
    //Dispatcher类中
    void enqueue(AsyncCall call) {
      synchronized (this) {
        readyAsyncCalls.add(call);//2
        // the same host.
        if (!call.get().forWebSocket) {
          AsyncCall existingCall = findExistingCallWithHost(call.host());//3
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
      }
      promoteAndExecute();//4
    }
    
    • 注释1:将Call封装成AsyncCall,然后加入分发器等待被执行;
    • 注释2:将AsyncCall加入等待队列;
    • 注释3:findExistingCallWithHost查找是否有相同host的请求,这是为统计同一个主机的请求有多少,便于做负载控制;
    • 注释4:promoteAndExecute,判断是否开启线程执行;
    private boolean promoteAndExecute() {
     
      List<AsyncCall> executableCalls = new ArrayList<>();
      boolean isRunning;
      synchronized (this) {
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall asyncCall = i.next();
    
          if (runningAsyncCalls.size() >= maxRequests) break; // 1
          if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // 2
    
          i.remove();
          asyncCall.callsPerHost().incrementAndGet();
          executableCalls.add(asyncCall);//3
          runningAsyncCalls.add(asyncCall);//4
        }
        isRunning = runningCallsCount() > 0;
      }
    
      for (int i = 0, size = executableCalls.size(); i < size; i++) {
        AsyncCall asyncCall = executableCalls.get(i);
        asyncCall.executeOn(executorService());//5 调用下面的executeOn函数
      }
    
      return isRunning;
    }
    
    //AsyncCall类中
    void executeOn(ExecutorService executorService) {      
          boolean success = false;
          try {
            executorService.execute(this);
            success = true;
          } catch (RejectedExecutionException e) {
            ......
          } finally {
            if (!success) {
              client.dispatcher().finished(this); // This call is no longer running!
            }
          }
        }
    
    • 注释1:判断当前执行的任务数,默认最大值为64;
    • 注释2:判断同一个Host的任务数,默认最大值为5;
    • 注释3、4:如果当前执行的任务数小于64而且每个host的任务数小于5,则将任务从等待队列中移除,加入执行队列;我理解executableCalls是等待执行队列与执行中队列间的过渡变量,记录该次要被调度的任务;
    • 注释5:执行刚才从等待队列移除的任务,线程池开始调度任务;executorService()返回线程池对象;AsyncCall实现了Runnable,所以可以被线程池调度;
    • 为了保证,等待队列的任务能及时被调度,所以调用enqueue、setMaxRequests、setMaxRequestsPerHost、finished函数,promoteAndExecute会被触发;

    OKHttp的线程池

     public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    
    • executorService()返回线程池,单例模式;
    • 这个线程池有点意思,核心线程数为0,线程数最大是 Integer.MAX_VALUE,SynchronousQueue是一个没有容量的队列;所以当添加任务时,因为核心线程为0,那么任务将添加SynchronousQueue中,由于它没有容量,所以线程池会创建线程来执行任务;
    • 补充知识,任务往SynchronousQueue添加时,会有两个情况:1.线程A调用了take或poll取任务,线程B调用offer添加任务将成功,但是里面被线程A取走了并执行,2.没有线程调用take或poll,线程B调用offer添加任务将失败,线程池新建线程来执行任务;利用没有容量的队列,可以使任务能马上得到响应,没必要等队列满才新建线程执行任务;
    //AsyncCall类中
    @Override protected void execute() {
          ......
          try {
            Response response = getResponseWithInterceptorChain();//1
            ......
            responseCallback.onResponse(RealCall.this, response);//2
          } catch (IOException e) {
            ......
            responseCallback.onFailure(RealCall.this, e);
          } catch (Throwable t) {
              ......
              responseCallback.onFailure(RealCall.this, canceledException);
            }
            throw t;
          } finally {
            client.dispatcher().finished(this);
          }
        }
    
    • 注释1:前面提到AsyncCall实现Runnable接口,当线程池执行AsyncCall.run,最后会执行AsyncCall.execute;获取结果;
    • 注释2:回调结果;
    • 至此任务被调度了

    疑问1:线程数最大可以是Integer.MAX_VALUE,会不会撑爆虚拟机?

    其实不会的,因为在Dispatcher.promoteAndExecute添加任务时,已经限制了数量;

    疑问2:为什么核心线程数为0?

    默认情况下,核心线程是不会被回收,除非线程池shutdown了,而其他线程可以设置超时时间,后来我想了想,核心线程数不为0,通过allowCoreThreadTimeOut设置true,也可以达到目的,只不过直接设置0比较简单;

    总结

    • 执行call.enquque,只是把任务加入等待队列,等待被执行;
    • 当前执行中的任务数小于64,且所属host任务数小于5,则将任务移入正在执行队列,触发线程池执行;
    • 线程池核心线程数为0,线程数最大为Integer.MAX_VALUE,利用没有容量的SynchronousQueue,让请求都能马上得到响应;

    以上分析有不对的地方,请指出,互相学习,谢谢哦!

    相关文章

      网友评论

          本文标题:Android - 剖析OKHttp(2)- 调度请求

          本文链接:https://www.haomeiwen.com/subject/pekavktx.html