okhttp2

作者: 天马呵呵拳 | 来源:发表于2019-06-20 20:53 被阅读0次

    okhttp分享2:okhttp的线程管理及任务分发

    一、线程池

    上文主要分析了同步请求的大致流程,现在我们来看一下异步请求的流程,异步请求涉及的线程管理及任务分发okhttp是使用线程池实现的,因此我们首先看下android线程池的使用。

    1.1、线程池适合场景

    假如一个服务器完成一项任务的时间为T= T1+T2+T3

    • T1 创建线程的时间
    • T2 在线程中执行任务的时间,包括线程同步所需要的时间
    • T3 线程销毁的时间
      为了尽量减少T1与T3的时间,因此我们使用线程池维护一些线程,当需要使用线程时直接从线程池中取出使用,从而节省了T1、T3。

    1.2、线程池构造方法

    一般我们会使用ThreadPoolExecutor来构建线程池,其最终构造方法如下

    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    下面我们主要分析下其参数

    • corePoolSize:该线程池中核心线程数最大值

    核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程。核心线程默认情况下会一直存活在线程池中,即使这个核心线程处于闲置状态。非核心线程会在一定时间后被回收(keepAliveTime)。
    如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(keepAliveTime),就会被销毁掉。

    • maximumPoolSize: 该线程池中线程总数最大值

    线程总数 = 核心线程数 + 非核心线程数。

    • keepAliveTime:该线程池中非核心线程闲置超时时长

    一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,如果设置allowCoreThreadTimeOut = true,则也会作用于核心线程。

    • TimeUnit unit:keepAliveTime的单位

    TimeUnit是一个枚举类型,其包括:
    NANOSECONDS : 1微毫秒 = 1微秒 / 1000
    MICROSECONDS : 1微秒 = 1毫秒 / 1000
    MILLISECONDS : 1毫秒 = 1秒 /1000
    SECONDS : 秒
    MINUTES : 分
    HOURS : 小时
    DAYS : 天

    • workQueue:该线程池中的任务队列:维护着等待执行的Runnable对象

    当所有的核心线程都在处于工作状态时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
    常用的workQueue类型:

    • SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它, 如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现线程数达到了maximumPoolSize而不能新建线程的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。
    • LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize。
    • ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。
    • DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
    • ThreadFactory threadFactory:线程工厂

    自定义创建线程的方式,这是一个接口。

    • RejectedExecutionHandler handler:饱和策略

    AbortPolicy(默认):直接抛弃
    CallerRunsPolicy:用调用者的线程执行任务
    DiscardOldestPolicy:抛弃队列中最久的任务
    DiscardPolicy:抛弃当前任务

    1.3、线程池执行逻辑

    线程池执行主要是在execute方法,由于篇幅有限,本文不做过多叙述,仅介绍大致流程。当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,可以允许池容纳任意数量的并发任务。大致流程图如下


    线程池.jpg

    以下用 currentSize 表示线程池中当前线程数量

    1. 当 currentSize < corePoolSize 时,将会直接启动一个核心线程来执行任务。
    2. 当 currentSize >= corePoolSize ,那么任务会被插入到任务队列中排队等待执行。
    3. 如果任务队列已满,但 currentSize < maximumPoolSize,那么会立刻启动一个非核心线程来执行任务
    4. 如果任务队列已满,并且 currentSize > maximumPoolSize,那么就拒绝执行任务,ThreadPoolExecutor 会调用 RejectedExecutionHandler 的 rejectedExecution 方法来通知调用者。

    二、异步请求

    直接上代码
    RealCall中

     public void enqueue(Callback responseCallback) {
            synchronized (this) {
                if (executed) throw new IllegalStateException("Already Executed");
                executed = true;
            }
            captureCallStackTrace();
            eventListener.callStart(this);
            client.dispatcher().enqueue(new AsyncCall(responseCallback));
        }
    

    主要看最后一行,首先看下Dispatcher类的相关方法。

    
      private int maxRequests = 64;// 最大并发请求数为64
      private int maxRequestsPerHost = 5; //每个主机最大请求数为5
    /** Executes calls. Created lazily. */
     //消费者池(也就是线程池)
      private @Nullable ExecutorService executorService;
    
      /** Ready async calls in the order they'll be run. */
    // 异步的任务中正在等待被消费的线程集合
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
    //正在运行的 异步的任务集合
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
       
      ...
    
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    
      ...
    
      synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    

    executorService()方法实现了懒加载的无数量限制的线程池,根据我们上文介绍可知在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
    回到enqueue方法,其执行逻辑也很清晰,如果当前正在执行的call的数量大于maxRequest(64),或者该call的Host上的call超过maxRequestsPerHost(5),则加入readyAsyncCall排队等待,否则加入runningAsyncCalls并执行。
    再看下enqueue方法的传入参数AsyncCall,他是RealCall的一个内部类,代码如下

        final class AsyncCall extends NamedRunnable {
            private final Callback responseCallback;
    
            AsyncCall(Callback responseCallback) {
                super("OkHttp %s", redactedUrl());
                this.responseCallback = responseCallback;
            }
    
            String host() {
                return originalRequest.url().host();
            }
    
            Request request() {
                return originalRequest;
            }
    
            RealCall get() {
                return RealCall.this;
            }
    
            @Override
            protected void execute() {
                boolean signalledCallback = false;
                try {
                    Response response = getResponseWithInterceptorChain();
                    if (retryAndFollowUpInterceptor.isCanceled()) {
                        signalledCallback = true;
                        responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
                    } else {
                        signalledCallback = true;
                        responseCallback.onResponse(RealCall.this, response);
                    }
                } catch (IOException e) {
                    if (signalledCallback) {
                        // Do not signal the callback twice!
                        Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
                    } else {
                        eventListener.callFailed(RealCall.this, e);
                        responseCallback.onFailure(RealCall.this, e);
                    }
                } finally {
                    client.dispatcher().finished(this);
                }
            }
        }
    

    当线程池执行到call时,会调用execute()方法,我们发现该方法基本流程与同步请求基本一致,不做过多说明。最后调用dispatcer的finished方法,其代码如下

    /** Used by {@code AsyncCall#run} to signal completion. */
      void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
    
    ...
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    
    ...
    
      private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    
    

    通过上面代码,大家可以知道finished先执行calls.remove(call)删除call,然后执行promoteCalls(),在promoteCalls()方法里面:如果当前线程大于maxRequest则不操作,如果小于maxRequest则遍历readyAsyncCalls,取出一个call,并把这个call放入runningAsyncCalls,然后执行execute。在遍历过程中如果runningAsyncCalls超过maxRequest则不再添加,否则一直添加。实际上promoteCalls()负责ready的Call到running的Call的转化。

    具体流程图如下


    okhttp线程分发.png

    相关文章

      网友评论

          本文标题:okhttp2

          本文链接:https://www.haomeiwen.com/subject/lcklqctx.html