接上节,看一下我们没有看的两个Producer:
ThreadHandoffProducer
与BitmapMemoryCacheKeyMultiplexProducer
/**
* Bitmap cache get -> thread hand off -> multiplex -> bitmap cache
* @param inputProducer producer providing the input to the bitmap cache
* @return bitmap cache get to bitmap cache sequence
*/
private Producer<CloseableReference<CloseableImage>> newBitmapCacheGetToBitmapCacheSequence(
Producer<CloseableReference<CloseableImage>> inputProducer) {
BitmapMemoryCacheProducer bitmapMemoryCacheProducer =
mProducerFactory.newBitmapMemoryCacheProducer(inputProducer);
BitmapMemoryCacheKeyMultiplexProducer bitmapKeyMultiplexProducer =
mProducerFactory.newBitmapMemoryCacheKeyMultiplexProducer(bitmapMemoryCacheProducer);
ThreadHandoffProducer<CloseableReference<CloseableImage>> threadHandoffProducer =
mProducerFactory.newBackgroundThreadHandoffProducer(
bitmapKeyMultiplexProducer,
mThreadHandoffProducerQueue);
return mProducerFactory.newBitmapMemoryCacheGetProducer(threadHandoffProducer);
}
ThreadHandoffProducer
官方是这样解释它的功能的:Uses ExecutorService to move further computation to different threa。可以这样理解它的功能,它可以把接下来的 inputProducer要做的事情切换到另一个线程。它的核心实现类是:ThreadHandoffProducerQueue
。
public class ThreadHandoffProducerQueue {
//...
private final Deque<Runnable> mRunnableList;
private final Executor mExecutor;
}
从成员变量大致可以看出,它内部利用队列维护一个Runnable的List,Executor执行这个队列中的runnable。
看一下ThreadHandoffProducer
的produceResults
@Override
public void produceResults(final Consumer<T> consumer, final ProducerContext context) {
final ProducerListener producerListener = context.getListener();
final String requestId = context.getId();
final StatefulProducerRunnable<T> statefulRunnable = new StatefulProducerRunnable<T>(
consumer,
producerListener,
PRODUCER_NAME,
requestId) {
@Override
protected void onSuccess(T ignored) {
producerListener.onProducerFinishWithSuccess(requestId, PRODUCER_NAME, null);
mInputProducer.produceResults(consumer, context);
}
//....
};
context.addCallbacks(
new BaseProducerContextCallbacks() {
@Override
public void onCancellationRequested() {
statefulRunnable.cancel();
mThreadHandoffProducerQueue.remove(statefulRunnable);
}
});
mThreadHandoffProducerQueue.addToQueueOrExecute(statefulRunnable);
}
onSuccess()
方法是在runnable被添加到Queue中后,运行在Runnable的run方法中的,因此是运行中指定的新线程中。
那么ThreadHandoffProducer
在这里切换到哪个线程呢?其实决定线程切换的是ThreadHandoffProducerQueue
,因此,只需要知道ThreadHandoffProducerQueue
的Executor是什么就好了,追踪源码:
public ImagePipelineFactory(ImagePipelineConfig config) {
mConfig = Preconditions.checkNotNull(config);
mThreadHandoffProducerQueue = new ThreadHandoffProducerQueue(
config.getExecutorSupplier().forLightweightBackgroundTasks());
}
mLightWeightBackgroundExecutor =
Executors.newFixedThreadPool(
NUM_LIGHTWEIGHT_BACKGROUND_THREADS,
new PriorityThreadFactory(
Process.THREAD_PRIORITY_BACKGROUND, "FrescoLightWeightBackgroundExecutor", true));
即ThreadHandoffProducer
的作用是把接下来的任务切换到后台线程。且都在一个后台线程中执行。NUM_LIGHTWEIGHT_BACKGROUND_THREADS
的值为1
BitmapMemoryCacheKeyMultiplexProducer
这个类实际上继承自MultiplexProducer
, 主要功能实现都在父类中,因此先主要看一下父类,它的主要作用是 : Producer for combining multiple identical requests into a single request.
大致意思就是,这个类会把所有相同的图片请求组合成一个单一的请求,如果这个请求完成,那么所有的图片请求都获得这个请求结果并返回。如果所有相同的图片请求都被取消,那么这个请求才算被取消。即多路复用
。
首先对于所有同一个图片请求的request是使用
Multiplexer
来维护的。
Multiplexer
对于相同请求,是这样组织的:
private final K mKey;
private final CopyOnWriteArraySet<Pair<Consumer<T>, ProducerContext>> mConsumerContextPairs;
即mKey为这些同一个请求的key,使用一个Set来维护每个请求的Consumer
和ProducerContext
。
- addNewConsumer
public boolean addNewConsumer(final Consumer<T> consumer,final ProducerContext producerContext) {
final Pair<Consumer<T>, ProducerContext> consumerContextPair = Pair.create(consumer, producerContext);
//...
synchronized (Multiplexer.this) {
if (getExistingMultiplexer(mKey) != this) {
return false;
}
mConsumerContextPairs.add(consumerContextPair);
prefetchCallbacks = updateIsPrefetch();
priorityCallbacks = updatePriority();
//...
}
BaseProducerContext.callOnIsPrefetchChanged(prefetchCallbacks);
BaseProducerContext.callOnPriorityChanged(priorityCallbacks);
BaseProducerContext.callOnIsIntermediateResultExpectedChanged(intermediateResultsCallbacks);
synchronized (consumerContextPair) {
// check if last result changed in the mean time. In such case we should not propagate it
synchronized (Multiplexer.this) {
if (lastIntermediateResult != mLastIntermediateResult) {
lastIntermediateResult = null;
} else if (lastIntermediateResult != null) {
lastIntermediateResult = cloneOrNull(lastIntermediateResult);
}
}
if (lastIntermediateResult != null) {
if (lastProgress > 0) {
consumer.onProgressUpdate(lastProgress);
}
consumer.onNewResult(lastIntermediateResult, lastStatus);
closeSafely(lastIntermediateResult);
}
}
addCallbacks(consumerContextPair, producerContext);
return true;
}
- 对于新的请求,会利用该请求的
Consumer
和ProducerContext
新建一个Pair添加到Multiplexer
中。 - 更新相关callback。只要有一个请求需要
Prefetch
orPriority
, 那么就为这个请求设置上相关回调,保证任意一个请求拿到返回结果,都可以让需要这个回调的请求得到回调。 - 如果在添加callback的过程中获得了
lastIntermediateResult
, 则立即对当前添加的 Pair<Consumer,ProducerContext>回调consumer.onNewResult(lastIntermediateResult, lastStatus)
- startInputProducerIfHasAttachedConsumers
private void startInputProducerIfHasAttachedConsumers() {
BaseProducerContext multiplexProducerContext;
ForwardingConsumer forwardingConsumer;
synchronized (Multiplexer.this) {
Preconditions.checkArgument(mMultiplexProducerContext == null);
Preconditions.checkArgument(mForwardingConsumer == null);
// Cleanup if all consumers have been cancelled before this method was called
if (mConsumerContextPairs.isEmpty()) {
removeMultiplexer(mKey, this);
return;
}
ProducerContext producerContext = mConsumerContextPairs.iterator().next().second;
mMultiplexProducerContext = new BaseProducerContext(
producerContext.getImageRequest(),
producerContext.getId(),
producerContext.getListener(),
producerContext.getCallerContext(),
producerContext.getLowestPermittedRequestLevel(),
computeIsPrefetch(),
computeIsIntermediateResultExpected(),
computePriority());
mForwardingConsumer = new ForwardingConsumer();
multiplexProducerContext = mMultiplexProducerContext;
forwardingConsumer = mForwardingConsumer;
}
mInputProducer.produceResults(
forwardingConsumer,
multiplexProducerContext);
}
这个方法很简单: 取出 mConsumerContextPairs
中的第一个Pair
构建mMultiplexProducerContext
。构建ForwardingConsumer
,然后将后续处理传递给下一个 inputProducer。
好,到这里我们看完了Multiplexer
的核心实现,下面看一下 Multiplexer
是如何使用它的 produceResults
:
@Override
public void produceResults(Consumer<T> consumer, ProducerContext context) {
K key = getKey(context);
Multiplexer multiplexer;
boolean createdNewMultiplexer;
// We do want to limit scope of this lock to guard only accesses to mMultiplexers map.
// However what we would like to do here is to atomically lookup mMultiplexers, add new
// consumer to consumers set associated with the map's entry and call consumer's callback with
// last intermediate result. We should not do all of those things under this lock.
do {
createdNewMultiplexer = false;
synchronized (this) {
multiplexer = getExistingMultiplexer(key);
if (multiplexer == null) {
multiplexer = createAndPutNewMultiplexer(key);
createdNewMultiplexer = true;
}
}
// addNewConsumer may call consumer's onNewResult method immediately. For this reason
// we release "this" lock. If multiplexer is removed from mMultiplexers in the meantime,
// which is not very probable, then addNewConsumer will fail and we will be able to retry.
} while (!multiplexer.addNewConsumer(consumer, context));
if (createdNewMultiplexer) {
multiplexer.startInputProducerIfHasAttachedConsumers();
}
}
- 调用实现类的getKey方法,即
BitmapMemoryCacheKeyMultiplexProducer
, 该类是根据一个请求的一系列参数,生成一个key。 - 根据这个可以确定相应的
Multiplexer
, 并调用multiplexer.addNewConsumer(consumer, context)
- 如果是新创建的Multiplexer,则调用
multiplexer.startInputProducerIfHasAttachedConsumers();
保证后续流程可以继续。
对于 Multiplexer
和 MultiplexProducer
的工作原理可以使用下图总结:
网友评论