学科归属&背景
- Java8新特性,能够避免多重循环和反复遍历,导致有许多中间变量,增加内存的消耗。适用于统计/变换等场景。
侧重点/目标
1.中间的操作结果存在哪里?内存的消耗情况怎么样?
- stream的实现不是对所有的数据依次进行单一操作,而是对一个元素遍历所有操作。内存消耗相比于迭代,大大降低。
2.中间操作的状态&短路是指什么
- 有状态,依赖上一个操作的结果,不能并行?如,sorted
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);//等中间缓存的排序结果都ok end了,对排序后的结果,重新发起后续的操作
}
}
downstream.end();
list = null;
}
- 无状态,操作之间无关联。
- 短路:执行到这个操作,这个元素有可能就被剔除,不能进行后续的操作了,类似continue。如,filter。=》合理布置stream操作顺序,可以减少计算量。
3.传入的抽象方法实现,是存在哪里?是由哪个类怎么执行的?如何短路执行(中间取消)?
- 惰性执行(调用终止方法时,才真正执行整个流操作)怎么实现的?Spliterator接口负责将数据转成流水线的输入,Sink接口是消费者,遍历消费操作。
- 定义数据源,开启流水线:Collection的stream(),实现了Spliterator接口,返回构造出ReferencePipeline.Head实例
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
- 传入的抽象方法实现,会根据有无状态,构造出StatelessOp或 StatefulOp。一步步构成操作的双向链表,同时并没有实际执行操作。实际操作中,AbstractPipeline双头链表保存前后操作和头结点,方便同一元素进行操作。ChainedReference执行操作流的执行/停止。
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
……
- 终止方法,构造出TerminalOp,同时发起ReferencePipeline.evaluate,开始惰性执行
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// **并行与否**
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))//ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
// 从最后操作往前,包成新的Sink,复合combinedFlags
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
// 第二个操作的节点,判断combinedFlags,如果短路了,即停止,然后重新反向执行?
// 未短路,执行此次的新Sink反向流的begin,依次执行操作的,递归发起下一个元素的
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());// 流水ok,你准备一下
spliterator.forEachRemaining(wrappedSink);// Collection实现spliterator,开始执行遍历元素,递归执行
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
// ArrayList实现
public void forEachRemaining(Consumer<? super E> action) {
int i, hi, mc; // hoist accesses and checks from loop
ArrayList<E> lst; Object[] a;
if (action == null)
throw new NullPointerException();
if ((lst = list) != null && (a = lst.elementData) != null) {
if ((hi = fence) < 0) {
mc = lst.modCount;
hi = lst.size;
}
else
mc = expectedModCount;
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
action.accept(e);//各个操作通过Sink接口accept方法依次向下传递执行。
}
if (lst.modCount == mc)
return;
}
}
throw new ConcurrentModificationException();
}
4.泛型的使用
自己套自己public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> extends Spliterator<T> {
interface Builder<T> extends Sink<T> {
/**
* Builds the node. Should be called after all elements have been
* pushed and signalled with an invocation of {@link Sink#end()}.
*
* @return the resulting {@code Node}
*/
Node<T> build();
/**
* Specialized @{code Node.Builder} for int elements
*/
interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
@Override
Node.OfInt build();
}
Lists.<Person>newArrayList().stream()
.collect(Collectors.groupingBy(Person::getType, HashMap::new, Collectors.toList()));
5.操作码是如何标识的。位的应用。sourceOrOpFlags
6.设计模式
责任链模式,一个接一个处理事件。
7.典型使用
collector&Collectors
知识迁移
1.并行的ForkJoinPool与多线程的关系,分治的思想如何有序拼接数据,如并行排序?
REF
[原来你是这样的 Stream —— 浅析 Java Stream 实现原理](https://zhuanlan.zhihu.com/p/47478339)
网友评论