Fresco-图片加载之线程切换与多路复用

作者: susion哒哒 | 来源:发表于2018-05-21 09:28 被阅读20次

    接上节,看一下我们没有看的两个Producer: ThreadHandoffProducerBitmapMemoryCacheKeyMultiplexProducer

      /**
       * Bitmap cache get -> thread hand off -> multiplex -> bitmap cache
       * @param inputProducer producer providing the input to the bitmap cache
       * @return bitmap cache get to bitmap cache sequence
       */
      private Producer<CloseableReference<CloseableImage>> newBitmapCacheGetToBitmapCacheSequence(
          Producer<CloseableReference<CloseableImage>> inputProducer) {
    
        BitmapMemoryCacheProducer bitmapMemoryCacheProducer =
            mProducerFactory.newBitmapMemoryCacheProducer(inputProducer);
    
        BitmapMemoryCacheKeyMultiplexProducer bitmapKeyMultiplexProducer =
            mProducerFactory.newBitmapMemoryCacheKeyMultiplexProducer(bitmapMemoryCacheProducer);
    
        ThreadHandoffProducer<CloseableReference<CloseableImage>> threadHandoffProducer =
            mProducerFactory.newBackgroundThreadHandoffProducer(
                bitmapKeyMultiplexProducer,
                mThreadHandoffProducerQueue);
    
        return mProducerFactory.newBitmapMemoryCacheGetProducer(threadHandoffProducer);
      }
    
    

    ThreadHandoffProducer

    官方是这样解释它的功能的:Uses ExecutorService to move further computation to different threa。可以这样理解它的功能,它可以把接下来的 inputProducer要做的事情切换到另一个线程。它的核心实现类是:ThreadHandoffProducerQueue

    public class ThreadHandoffProducerQueue {
      //...
      private final Deque<Runnable> mRunnableList;
      private final Executor mExecutor;
    }
    

    从成员变量大致可以看出,它内部利用队列维护一个Runnable的List,Executor执行这个队列中的runnable。

    看一下ThreadHandoffProducerproduceResults

     @Override
      public void produceResults(final Consumer<T> consumer, final ProducerContext context) {
        final ProducerListener producerListener = context.getListener();
        final String requestId = context.getId();
        final StatefulProducerRunnable<T> statefulRunnable = new StatefulProducerRunnable<T>(
            consumer,
            producerListener,
            PRODUCER_NAME,
            requestId) {
          @Override
          protected void onSuccess(T ignored) {
            producerListener.onProducerFinishWithSuccess(requestId, PRODUCER_NAME, null);
            mInputProducer.produceResults(consumer, context);
          }
        //....
        };
        context.addCallbacks(
            new BaseProducerContextCallbacks() {
              @Override
              public void onCancellationRequested() {
                statefulRunnable.cancel();
                mThreadHandoffProducerQueue.remove(statefulRunnable);
              }
            });
        mThreadHandoffProducerQueue.addToQueueOrExecute(statefulRunnable);
      }
    

    onSuccess()方法是在runnable被添加到Queue中后,运行在Runnable的run方法中的,因此是运行中指定的新线程中。

    那么ThreadHandoffProducer在这里切换到哪个线程呢?其实决定线程切换的是ThreadHandoffProducerQueue,因此,只需要知道ThreadHandoffProducerQueue的Executor是什么就好了,追踪源码:

     public ImagePipelineFactory(ImagePipelineConfig config) {
        mConfig = Preconditions.checkNotNull(config);
        mThreadHandoffProducerQueue = new ThreadHandoffProducerQueue(
            config.getExecutorSupplier().forLightweightBackgroundTasks());
      }
    
    mLightWeightBackgroundExecutor =
        Executors.newFixedThreadPool(
            NUM_LIGHTWEIGHT_BACKGROUND_THREADS,
            new PriorityThreadFactory(
                Process.THREAD_PRIORITY_BACKGROUND, "FrescoLightWeightBackgroundExecutor", true));
    
    

    ThreadHandoffProducer的作用是把接下来的任务切换到后台线程。且都在一个后台线程中执行。NUM_LIGHTWEIGHT_BACKGROUND_THREADS的值为1

    BitmapMemoryCacheKeyMultiplexProducer

    这个类实际上继承自MultiplexProducer, 主要功能实现都在父类中,因此先主要看一下父类,它的主要作用是 : Producer for combining multiple identical requests into a single request.
    大致意思就是,这个类会把所有相同的图片请求组合成一个单一的请求,如果这个请求完成,那么所有的图片请求都获得这个请求结果并返回。如果所有相同的图片请求都被取消,那么这个请求才算被取消。即多路复用

    首先对于所有同一个图片请求的request是使用 Multiplexer来维护的。

    Multiplexer

    对于相同请求,是这样组织的:

    private final K mKey;
    private final CopyOnWriteArraySet<Pair<Consumer<T>, ProducerContext>> mConsumerContextPairs;
    

    即mKey为这些同一个请求的key,使用一个Set来维护每个请求的ConsumerProducerContext

    • addNewConsumer
     public boolean addNewConsumer(final Consumer<T> consumer,final ProducerContext producerContext) {
          final Pair<Consumer<T>, ProducerContext> consumerContextPair = Pair.create(consumer, producerContext);
          //...
          synchronized (Multiplexer.this) {
            if (getExistingMultiplexer(mKey) != this) {
              return false;
            }
            mConsumerContextPairs.add(consumerContextPair);
            prefetchCallbacks = updateIsPrefetch();
            priorityCallbacks = updatePriority();
            //...
          }
    
          BaseProducerContext.callOnIsPrefetchChanged(prefetchCallbacks);
          BaseProducerContext.callOnPriorityChanged(priorityCallbacks);
          BaseProducerContext.callOnIsIntermediateResultExpectedChanged(intermediateResultsCallbacks);
    
          synchronized (consumerContextPair) {
            // check if last result changed in the mean time. In such case we should not propagate it
            synchronized (Multiplexer.this) {
              if (lastIntermediateResult != mLastIntermediateResult) {
                lastIntermediateResult = null;
              } else if (lastIntermediateResult != null) {
                lastIntermediateResult = cloneOrNull(lastIntermediateResult);
              }
            }
    
            if (lastIntermediateResult != null) {
              if (lastProgress > 0) {
                consumer.onProgressUpdate(lastProgress);
              }
              consumer.onNewResult(lastIntermediateResult, lastStatus);
              closeSafely(lastIntermediateResult);
            }
          }
    
          addCallbacks(consumerContextPair, producerContext);
          return true;
        }
    
    1. 对于新的请求,会利用该请求的ConsumerProducerContext新建一个Pair添加到 Multiplexer中。
    2. 更新相关callback。只要有一个请求需要 Prefetch or Priority, 那么就为这个请求设置上相关回调,保证任意一个请求拿到返回结果,都可以让需要这个回调的请求得到回调。
    3. 如果在添加callback的过程中获得了 lastIntermediateResult, 则立即对当前添加的 Pair<Consumer,ProducerContext>回调consumer.onNewResult(lastIntermediateResult, lastStatus)
    • startInputProducerIfHasAttachedConsumers
        private void startInputProducerIfHasAttachedConsumers() {
          BaseProducerContext multiplexProducerContext;
          ForwardingConsumer forwardingConsumer;
          synchronized (Multiplexer.this) {
            Preconditions.checkArgument(mMultiplexProducerContext == null);
            Preconditions.checkArgument(mForwardingConsumer == null);
    
            // Cleanup if all consumers have been cancelled before this method was called
            if (mConsumerContextPairs.isEmpty()) {
              removeMultiplexer(mKey, this);
              return;
            }
    
            ProducerContext producerContext = mConsumerContextPairs.iterator().next().second;
            mMultiplexProducerContext = new BaseProducerContext(
                producerContext.getImageRequest(),
                producerContext.getId(),
                producerContext.getListener(),
                producerContext.getCallerContext(),
                producerContext.getLowestPermittedRequestLevel(),
                computeIsPrefetch(),
                computeIsIntermediateResultExpected(),
                computePriority());
    
            mForwardingConsumer = new ForwardingConsumer();
            multiplexProducerContext = mMultiplexProducerContext;
            forwardingConsumer = mForwardingConsumer;
          }
          mInputProducer.produceResults(
              forwardingConsumer,
              multiplexProducerContext);
        }
    

    这个方法很简单: 取出 mConsumerContextPairs中的第一个Pair构建mMultiplexProducerContext。构建ForwardingConsumer,然后将后续处理传递给下一个 inputProducer。

    好,到这里我们看完了Multiplexer的核心实现,下面看一下 Multiplexer是如何使用它的 produceResults

      @Override
      public void produceResults(Consumer<T> consumer, ProducerContext context) {
        K key = getKey(context); 
        Multiplexer multiplexer;
        boolean createdNewMultiplexer;
        // We do want to limit scope of this lock to guard only accesses to mMultiplexers map.
        // However what we would like to do here is to atomically lookup mMultiplexers, add new
        // consumer to consumers set associated with the map's entry and call consumer's callback with
        // last intermediate result. We should not do all of those things under this lock.
        do {
          createdNewMultiplexer = false;
          synchronized (this) {
            multiplexer = getExistingMultiplexer(key);
            if (multiplexer == null) {
              multiplexer = createAndPutNewMultiplexer(key);
              createdNewMultiplexer = true;
            }
          }
          // addNewConsumer may call consumer's onNewResult method immediately. For this reason
          // we release "this" lock. If multiplexer is removed from mMultiplexers in the meantime,
          // which is not very probable, then addNewConsumer will fail and we will be able to retry.
        } while (!multiplexer.addNewConsumer(consumer, context));
    
        if (createdNewMultiplexer) {
          multiplexer.startInputProducerIfHasAttachedConsumers();
        }
      }
    
    1. 调用实现类的getKey方法,即BitmapMemoryCacheKeyMultiplexProducer, 该类是根据一个请求的一系列参数,生成一个key。
    2. 根据这个可以确定相应的Multiplexer, 并调用 multiplexer.addNewConsumer(consumer, context)
    3. 如果是新创建的Multiplexer,则调用multiplexer.startInputProducerIfHasAttachedConsumers();保证后续流程可以继续。

    对于 MultiplexerMultiplexProducer的工作原理可以使用下图总结:

    image

    相关文章

      网友评论

        本文标题:Fresco-图片加载之线程切换与多路复用

        本文链接:https://www.haomeiwen.com/subject/hzzxjftx.html