先看一下源码
AbstractPipeline.class
/**
* Evaluate the pipeline with a terminal operation to produce a result.
* 使用终端操作 terminalOp 对此流管道进行处理,处理过程中会从后往前链接形成流水线
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
// 此阶段的输出类型==终端操作的输入类型
assert getOutputShape() == terminalOp.inputShape();
// 不允许重复消费
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
// 设置已消费标识
linkedOrConsumed = true;
// 使用终端操作并行或串行处理此流管道
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
/**
* 获取此阶段的源分割迭代器【数据源】
*/
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
// 1)源分割迭代器不为 null
if (sourceStage.sourceSpliterator != null) {
// 读取
spliterator = sourceStage.sourceSpliterator;
// 使用后置空
sourceStage.sourceSpliterator = null;
}
// 2)分割迭代器通过 sourceSupplier 进行生成
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
// 此流是并行的 && 流管道中存在有状态操作
if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
/**
* 从源阶段开始处理,一直处理到当前阶段为止
*/
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
// 当前处理阶段是有状态操作
if (p.opIsStateful()) {
depth = 0;
// 当前操作是短路操作
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// Inject or clear SIZED on the source pipeline stage based on the stage's spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? thisOpFlags & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED
: thisOpFlags & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
// 终端操作带有标识位
if (terminalFlags != 0) {
// 将终端操作的标志位合并到最后一阶段中
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
并行流处理
通过调用终端 sink 的 evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator)
方法完成并行计算任务的创建和执行。
再来看下stream包下面的AbstractTask.class
是如何进行分割并处理任务的
/**
* Decides whether or not to split a task further or compute it
* directly. If computing directly, calls {@code doLeaf} and pass
* the result to {@code setRawResult}. Otherwise splits off
* subtasks, forking one and continuing as the other.
*
* <p> The method is structured to conserve resources across a
* range of uses. The loop continues with one of the child tasks
* when split, to avoid deep recursion. To cope with spliterators
* that may be systematically biased toward left-heavy or
* right-heavy splits, we alternate which child is forked versus
* continued in the loop.
*/
@Override
public void compute() {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
// 读取分割迭代器的估计总元素个数
long sizeEstimate = rs.estimateSize();
// 读取单个任务的元素上限
final long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
// 估计总元素数 > 阈值 && 尝试对 spliterator 进行二分
while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
// 基于leftSpliterator 创建子任务并写入 leftChild
task.leftChild = leftChild = task.makeChild(ls);
// 基于 rightSpliterator 创建子任务并写入 leftChild
task.rightChild = rightChild = task.makeChild(rs);
// 设置 task 的待完成任务数为 1【将会有一个子任务被 fork 进线程池中并行处理】
task.setPendingCount(1);
// 是否 forkRight【第一次 forkRight,接着 forkLeft,轮流交替】
if (forkRight) {
forkRight = false;
// 更新待分割的 spliterator
rs = ls;
// 待处理的任务
task = leftChild;
// 待 fork 进线程池并行处理的任务
taskToFork = rightChild;
}
else {
forkRight = true;
// 待处理的任务
task = rightChild;
// 待 fork 进线程池并行处理的任务
taskToFork = leftChild;
}
taskToFork.fork();
// 读取待分割 spliterator 的估计总元素个数
sizeEstimate = rs.estimateSize();
}
// 此任务已经是叶子任务,则执行计算逻辑
task.setLocalResult(task.doLeaf());
// 此任务计算完毕后尝试完成主任务
task.tryComplete();
}
Fork/Join流程
网友评论