线程池与Glide网络请求并发调度

作者: Reiya | 来源:发表于2017-08-24 20:12 被阅读1456次

    Executors类提供了4种不同的线程池:newCachedThreadPool、newFixedThreadPool、 newScheduledThreadPool和newSingleThreadExecutor,它们都是直接或间接通过ThreadPoolExecutor实现的。
    *ThreadPoolExecutor:

        // Public constructors and methods
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default thread factory and rejected execution handler.
         * It may be more convenient to use one of the {@link Executors} factory
         * methods instead of this general purpose constructor.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue)
    

    ThreadPoolExecutor的构造方法有以下几个重要参数:
    corePoolSize:核心线程数。核心线程会在线程池中一直存活,即时它们处于闲置状态,例外情况是allowCoreThreadTimeOut被设置为true。
    maximumPoolSize:最大线程数。
    keepAliveTime:线程闲置时的超时时长,超时后线程会被回收。
    unit:keepAliveTime的时间单位。
    workQueue:存放等待执行的任务的阻塞队列。
    队列与线程池按照以下规则进行交互:
    如果运行的线程数小于核心线程数(corePoolSize),则首选添加新线程而不排队。如果运行的线程数等于或者大于核心线程数(corePoolSize),则首选将请求加入队列而不添加新线程。如果请求无法加入队列,则创建新线程;如果这将导致超出最大线程数(maximumPoolSize),则任务将被拒绝执行。
    *Executors:

        /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * {@code nThreads} threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    可以看到FixedThreadPool只有固定数量的核心线程,任务队列是基于链表的无界阻塞队列。当所有线程都在运行时,新任务都会放到任务队列中等待。
    默认情况下,Glide的网络请求是在EngineJob中的sourceExecutor中执行的,而这个sourceExecutor是通过GlideExecutor的newSourceExecutor方法实例化的。
    *GlideExecutor:

    /**
       * Returns a new fixed thread pool with the default thread count returned from
       * {@link #calculateBestThreadCount()}, the {@link #DEFAULT_SOURCE_EXECUTOR_NAME} thread name
       * prefix, and the
       * {@link com.bumptech.glide.load.engine.executor.GlideExecutor.UncaughtThrowableStrategy#DEFAULT}
       * uncaught throwable strategy.
       *
       * <p>Source executors allow network operations on their threads.
       */
      public static GlideExecutor newSourceExecutor() {
        return newSourceExecutor(calculateBestThreadCount(), DEFAULT_SOURCE_EXECUTOR_NAME,
            UncaughtThrowableStrategy.DEFAULT);
      }
    
      /**
       * Returns a new fixed thread pool with the given thread count, thread name prefix,
       * and {@link com.bumptech.glide.load.engine.executor.GlideExecutor.UncaughtThrowableStrategy}.
       *
       * <p>Source executors allow network operations on their threads.
       *
       * @param threadCount The number of threads.
       * @param name The prefix for each thread name.
       * @param uncaughtThrowableStrategy The {@link
       * com.bumptech.glide.load.engine.executor.GlideExecutor.UncaughtThrowableStrategy} to use to
       *                                  handle uncaught exceptions.
       */
      public static GlideExecutor newSourceExecutor(int threadCount, String name,
          UncaughtThrowableStrategy uncaughtThrowableStrategy) {
        return new GlideExecutor(threadCount, name, uncaughtThrowableStrategy,
            false /*preventNetworkOperations*/, false /*executeSynchronously*/);
      }
    
      // Visible for testing.
      GlideExecutor(int poolSize, String name,
          UncaughtThrowableStrategy uncaughtThrowableStrategy, boolean preventNetworkOperations,
          boolean executeSynchronously) {
        this(
            poolSize /* corePoolSize */,
            poolSize /* maximumPoolSize */,
            0 /* keepAliveTimeInMs */,
            name,
            uncaughtThrowableStrategy,
            preventNetworkOperations,
            executeSynchronously);
      }
    
      GlideExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTimeInMs, String name,
          UncaughtThrowableStrategy uncaughtThrowableStrategy, boolean preventNetworkOperations,
          boolean executeSynchronously) {
        this(
            corePoolSize,
            maximumPoolSize,
            keepAliveTimeInMs,
            name,
            uncaughtThrowableStrategy,
            preventNetworkOperations,
            executeSynchronously,
            new PriorityBlockingQueue<Runnable>());
      }
    
      GlideExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTimeInMs, String name,
          UncaughtThrowableStrategy uncaughtThrowableStrategy, boolean preventNetworkOperations,
          boolean executeSynchronously, BlockingQueue<Runnable> queue) {
        super(
            corePoolSize,
            maximumPoolSize,
            keepAliveTimeInMs,
            TimeUnit.MILLISECONDS,
            queue,
            new DefaultThreadFactory(name, uncaughtThrowableStrategy, preventNetworkOperations));
        this.executeSynchronously = executeSynchronously;
      }
    
      /**
       * Determines the number of cores available on the device.
       *
       * <p>{@link Runtime#availableProcessors()} returns the number of awake cores, which may not
       * be the number of available cores depending on the device's current state. See
       * http://goo.gl/8H670N.
       */
      public static int calculateBestThreadCount() {
        // We override the current ThreadPolicy to allow disk reads.
        // This shouldn't actually do disk-IO and accesses a device file.
        // See: https://github.com/bumptech/glide/issues/1170
        ThreadPolicy originalPolicy = StrictMode.allowThreadDiskReads();
        File[] cpus = null;
        try {
          File cpuInfo = new File(CPU_LOCATION);
          final Pattern cpuNamePattern = Pattern.compile(CPU_NAME_REGEX);
          cpus = cpuInfo.listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File file, String s) {
              return cpuNamePattern.matcher(s).matches();
            }
          });
        } catch (Throwable t) {
          if (Log.isLoggable(TAG, Log.ERROR)) {
            Log.e(TAG, "Failed to calculate accurate cpu count", t);
          }
        } finally {
          StrictMode.setThreadPolicy(originalPolicy);
        }
    
        int cpuCount = cpus != null ? cpus.length : 0;
        int availableProcessors = Math.max(1, Runtime.getRuntime().availableProcessors());
        return Math.min(MAXIMUM_AUTOMATIC_THREAD_COUNT, Math.max(availableProcessors, cpuCount));
      }
    

    GlideExecutor的newSourceExecutor与Executors的newFixedThreadPool类似,都是固定大小的线程池,不过任务队列不同。线程池大小为calculateBestThreadCount,该值为设备可用核心数但最大不超过4。任务队列为PriorityBlockingQueue,一种基于优先级的无界阻塞队列,插入元素需要实现Comparable接口的compareTo方法来提供排序依据。
    *DecodeJob:

      @Override
      public int compareTo(DecodeJob<?> other) {
        int result = getPriority() - other.getPriority();
        if (result == 0) {
          result = order - other.order;
        }
        return result;
      }
    

    Glide的Runnable实现类是DecodeJob,它的compareTo方法的逻辑是:优先级(共IMMEDIATE/HIGH/NORMAL/LOW四种,依次降低,默认为NORMAL)高的优先,若优先级相同则顺序在前的优先(先进先出)。
    *RequestBuilder:

      /**
       * Set the target the resource will be loaded into.
       *
       * @param target The target to load the resource into.
       * @return The given target.
       * @see RequestManager#clear(Target)
       */
      public <Y extends Target<TranscodeType>> Y into(@NonNull Y target) {
        Util.assertMainThread();
        Preconditions.checkNotNull(target);
        if (!isModelSet) {
          throw new IllegalArgumentException("You must call #load() before calling #into()");
        }
    
        Request previous = target.getRequest();
    
        if (previous != null) {
          requestManager.clear(target);
        }
    
        requestOptions.lock();
        Request request = buildRequest(target);
        target.setRequest(request);
        requestManager.track(target, request);
    
        return target;
      }
    

    *HttpUrlFetcher:

    
      @Override
      public void loadData(Priority priority, DataCallback<? super InputStream> callback) {
        long startTime = LogTime.getLogTime();
        final InputStream result;
        try {
          result = loadDataWithRedirects(glideUrl.toURL(), 0 /*redirects*/, null /*lastUrl*/,
              glideUrl.getHeaders());
        } catch (IOException e) {
          if (Log.isLoggable(TAG, Log.DEBUG)) {
            Log.d(TAG, "Failed to load data for url", e);
          }
          callback.onLoadFailed(e);
          return;
        }
    
        if (Log.isLoggable(TAG, Log.VERBOSE)) {
          Log.v(TAG, "Finished http url fetcher fetch in " + LogTime.getElapsedMillis(startTime)
              + " ms and loaded " + result);
        }
        callback.onDataReady(result);
      }
    
      private InputStream loadDataWithRedirects(URL url, int redirects, URL lastUrl,
          Map<String, String> headers) throws IOException {
        if (redirects >= MAXIMUM_REDIRECTS) {
          throw new HttpException("Too many (> " + MAXIMUM_REDIRECTS + ") redirects!");
        } else {
          // Comparing the URLs using .equals performs additional network I/O and is generally broken.
          // See http://michaelscharf.blogspot.com/2006/11/javaneturlequals-and-hashcode-make.html.
          try {
            if (lastUrl != null && url.toURI().equals(lastUrl.toURI())) {
              throw new HttpException("In re-direct loop");
    
            }
          } catch (URISyntaxException e) {
            // Do nothing, this is best effort.
          }
        }
    
        urlConnection = connectionFactory.build(url);
        for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
          urlConnection.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
        }
        urlConnection.setConnectTimeout(timeout);
        urlConnection.setReadTimeout(timeout);
        urlConnection.setUseCaches(false);
        urlConnection.setDoInput(true);
    
        // Stop the urlConnection instance of HttpUrlConnection from following redirects so that
        // redirects will be handled by recursive calls to this method, loadDataWithRedirects.
        urlConnection.setInstanceFollowRedirects(false);
    
        // Connect explicitly to avoid errors in decoders if connection fails.
        urlConnection.connect();
        if (isCancelled) {
          return null;
        }
        final int statusCode = urlConnection.getResponseCode();
        if (statusCode / 100 == 2) {
          return getStreamForSuccessfulRequest(urlConnection);
        } else if (statusCode / 100 == 3) {
          String redirectUrlString = urlConnection.getHeaderField("Location");
          if (TextUtils.isEmpty(redirectUrlString)) {
            throw new HttpException("Received empty or null redirect url");
          }
          URL redirectUrl = new URL(url, redirectUrlString);
          return loadDataWithRedirects(redirectUrl, redirects + 1, url, headers);
        } else if (statusCode == -1) {
          throw new HttpException(statusCode);
        } else {
          throw new HttpException(urlConnection.getResponseMessage(), statusCode);
        }
      }
    
      @Override
      public void cancel() {
        // TODO: we should consider disconnecting the url connection here, but we can't do so
        // directly because cancel is often called on the main thread.
        isCancelled = true;
      }
    
    

    Glide将加载请求和Target(ImageView)关联,开始某个ImageView的加载请求前会先将该ImageView关联的请求清除。此时在线程池中的关联的DecodeJob,正在进行的网络请求不会被中断,在等待队列里的也不会被直接从线程池移除,而是移除回调并设置取消标志位,让未开始的后续加载步骤的逻辑不会被执行。
    当列表(ListView/RecyclerView)快速滚动时,同时执行的网络请求数量不会超过设备可用核心数,其余请求会放到队列中等待执行。虽然队列长度可能会一下增加到几十,但随着列表复用View,队列中的大部分请求都会被取消掉,之后执行时不会发起网络请求,并迅速让位于等待中的请求。也就是说,快速滚动过程的中间很多个列表项的请求都会被略过。这样的机制保证了不会过度消耗资源导致滑动卡顿。

    相关文章

      网友评论

        本文标题:线程池与Glide网络请求并发调度

        本文链接:https://www.haomeiwen.com/subject/nayldxtx.html