几个概念
需要了解Stream
、Pipeline
、Stage
、Sink
几个概念。
Stream
Stream
就是一系列元素。是怎样的一系列元素呢?是支持顺序和并行聚集操作的一系列元素。
Stream
定义了一些中间操作(Intermediate operations)和结束操作(Terminal operations),
中间操作包括无状态(Stateless)操作比如:filter
, map
, flatMap
等,有状态(Stateful)操作比如:distinct
, sorted
, limit
等;
结束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach
, reduce
, collect
等和短路操作如:findFirst
, findAny
;
这些操作需要被按顺序记录下来,这就需要Pipeline
了。
Pipeline
Pipeline
就是管道的概念。
管道有一个基类PipelineHelper
,他是执行Stream
管道的一个helper
,将Stream
的所有信息收集到一个地方。
上面所说的操作其实都定义在PipelineHelper
的一个子类ReferencePipeline
中,包括Head
(Source stage of a ReferencePipeline)、StatelessOp
(Base class for a stateless intermediate stage of a Stream.)、StatefulOp
(Base class for a stateful intermediate stage of a Stream.)静态内部类。
Stage
Stream
中使用Stage
的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper
来代表Stage
,将具有先后顺序的各个Stage
连到一起,就构成了整个流水线。
image.png简单理解就是每一个操作就会产生一个
stage
Sink
现在有了Stage
了,如何串联起这些stage
呢?通过Sink
。
拿java.util.stream.Sink.ChainedReference
来看,里面有一个downstream
的实例成员,只要上游的sink调用下游的accept()
方法,即可保证串联起来执行:即每一个上游的sink
执行完自己的accept()
逻辑以后,调用下游sink
的accept()
方法(downstream.accept()
)。
类图1通过
stage
构建起来的双向链表只是给所有stage
关联在一起,每个stage
都知道自己前后有没有stage
存在,却没法知道后面stage
到底执行了哪种操作,以及回调函数是哪种形式。这就好像一个网络环境,根据iptables
我知道了我下游的路由器在哪里,那我怎么把数据传递给它呢?通过协议,TCP协议!这么看的话,Sink
更像是一个协议,让每一个stage
执行完以后,知道调用某个方法来将数据传递给下一个stage
,也就是Sink#accept()
。
通过代码来讲讲:
@Test
public void testFilter() {
Stream.of(1, 2, 3, 4, 5)
.filter(item -> item > 3)
.forEach(System.out::println);// 打印结果:4,5
}
其实这段代码等价于:
@Test
public void testFilter() {
Stream<Integer> head = Stream.of(1, 2, 3, 4, 5);
Stream<Integer> filterStream = head.filter(i -> i > 3);
filterStream.forEach(System.out::println);
filterStream.count();
}
可以看到,前面每一步都会返回一个新的Stream
,严格的说,其实是Stream
的子类,更严格说,是ReferencePipeline
的子类,参见上图类图1。具体如下:
// java.util.Arrays#stream(T[], int, int)
// 创建一个Stream实例
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
// spliterator方法创建一个ArraySpliterator,后面会关注到ArraySpliterator#forEachRemaining
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
// java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
// 创建一个管道(pipeline)的头节点
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
通过上面的代码可以看到,真正返回的是ReferencePipeline.Head
的实例,当然,就是Stream
的子类。
接着看filter()
方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// filter是一个无状态操作(StatelessOp),StatelessOp是管道的一个节点,也是Stream的子类
// 这段代码重点关注返回的StatelessOp实例,this是当前Stream实例,即上面代码Stream#of方法产生的实例,也就是ReferencePipeline.Head实例
// 通过这个方法串联起新建节点StatelessOp与ReferencePipeline.Head的关系,即StatelessOp#previousStage=ReferencePipeline.Head
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
// 这个方法先不关注,后面还会说到
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
filter
方法的默认实现是在ReferencePipeline
中实现的。上面这段代码我们需要重点关注的是,返回了一个StatelessOp
对象,也是ReferencePipeline
的子类。关注一下构造StatelessOp
的过程:
// java.util.stream.ReferencePipeline.StatelessOp#StatelessOp
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
// java.util.stream.ReferencePipeline#ReferencePipeline(java.util.stream.AbstractPipeline<?,P_IN,?>, int)
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
// java.util.stream.AbstractPipeline#AbstractPipeline(java.util.stream.AbstractPipeline<?,E_IN,?>, int)
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
// 这里是关联stage
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;// sourceStage是ReferencePipeline.Head
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
通过上面的操作,形成了这样的双向链表:
ReferencePipeline.Head <-> StatelessOp
一般后面在有其他操作,例如map()
、sort()
、limit()
等,都是返回一个ReferencePipeline
实例(StatefulOp
/StatelessOp
),在构造方法里面将双向链表串联起来。
最后来看forEach()
方法:
// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
// 通过工厂ForEachOps返回一个ForEachOp实例(下面会提到),本质是一个TerminalOp,即:构造了一个终止操作实例
evaluate(ForEachOps.makeRef(action, false));
}
evaluate()
方法就是一个执行Stream
的操作。
// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
// 关注顺序执行这里
// this是由filter方法返回的Stream,严格说是PipelineHelper,terminalOp是上面提到的ForEachOp实例
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
现在我们只关注顺序执行这一段:
// java.util.stream.ForEachOps.ForEachOp#evaluateSequential
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
// this是上面提到的ForEachOp实例,helper是filter方法返回的PipelineHelper
return helper.wrapAndCopyInto(this, spliterator).get();
}
wrapAndCopyInto
方法主要干了两件事:
- 封装
sink
链; - 将封装好的
sink
链从头到尾执行。
// 串联sink并执行
// 前面提到反向串联,这里是顺序串联
// 这个是helper的实例方法,也就是filter()方法构造出来的实例对象
// java.util.stream.AbstractPipeline#wrapAndCopyInto
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
// 这里的sink是上面提到的ForEachOp实例
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
先看看对sink
链的封装:
// java.util.stream.AbstractPipeline#wrapSink
// sink是ForEachOp实例
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
// 这里的sink是上面提到的ForEachOp实例
Objects.requireNonNull(sink);
// 不断的将p设置为前一个stage节点,然后调用p#opWrapSink构造出上一个stage对应的sink实例,这样p就知道了它的downstream是sink了
// 初始化的时候,AbstractPipeline.this对应的是filter()方法返回的实例对象,sink是TerminalOp实例
// p.depth > 0意味着sourceStage,即ReferencePipeline.Head实例不会包含到sink链中,因为sourceStage.depth = 0
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
// 关联好后,sink变成前一个stage
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;// 这个sink已经是sink链的链头了
}
image.png
回看前面提到的
filter()
方法,重点关注opWrapSink()
方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
// 重点关注这里
// opWrapSink的作用是创建当前节点的Sink实例。
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
// 将当前的stage封装成Sink实例,实例的downstream就是参数sink
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 这里先不关注,待会还要再提到
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
// java.util.stream.Sink.ChainedReference#ChainedReference
// 构造方法,创建sink实例,同时告知downstream,这样当前sink实例和downstream实例就串联起来了
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
可以看到上面方法就是构造filter()
方法这一步stage
对应的Sink
实例,并且串联起当前sink
实例与downstream
。这样在执行的时候就知道了,当前sink
执行完了,就调用downstream
的方法执行。
构造好的sink
链如下:
sink
链构造好了,下面就是执行的过程:
// java.util.stream.AbstractPipeline#copyInto
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
// 此时的wrappedSink已经是链表头了
Objects.requireNonNull(wrappedSink);
// 没有短路操作,执行这里
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());// 执行Sink#begin
// 这里的spliterator是ArraySpliterator,前面提到过
// 对spliterator实例里面的元素逐个执行
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
// java.util.Spliterators.ArraySpliterator#forEachRemaining
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
// 对于串行执行的,就是对数组里的每个元素执行action.accept
// 并行因为调用了trySplit,index和fence会不同,不是这里关注的重点。
// 这里的action就是filter方法里的Sink.ChainedReference
do { action.accept((T)a[i]); } while (++i < hi);
}
}
上面的方法中调用了sink
链的accept()
方法,回看前面提到的filter()
方法,重点关注accept()
方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 重点关注这里
if (predicate.test(u))
// 调用downstream的accept,就串联执行了
downstream.accept(u);
}
};
}
};
}
// java.util.stream.ForEachOps.ForEachOp.OfRef#accept
@Override
public void accept(T t) {
// comsumer就是forEach方法里的lambda表达式
consumer.accept(t);
}
至此,一个完整的Stream
流程执行完毕。
附上Sink
对应的部分类图:
这里
XXXOps
都是对应XXXOp
的工厂,如ReduceOps
是ReduceOp
的工厂,用于生产ReduceOp
实例。
总结
总结一下:
-
Stream
是支持顺序和并行聚集操作的一系列元素。这里着重描述的是:Stream
=操作
+元素
; -
Pipeline
是管道,PipelineHelper
是将Stream
的信息收集到一个地方; -
Stage
就是一个完整的操作,每一个PipelineHelper
(或者说ReferencePipeline
)的实例就是一个Stage
,关联起所有Stage
将形成一个双向链表。 -
Sink
链的主要作用是控制Stream
中的每一个元素通过Stage
形成的管道。 -
Stream
的逻辑是:
- 构造
stage
双向链表 - 利用
stage
双向链表逆序构造Sink
链 - 通过
Spliterator#forEachRemaining
逐个处理元素,代码类似于:
for (T element : allElements) {// 逐个元素处理
Sink consumer = sinkHead;// sink链头
While (consumer != null) {
consumer.accept(element);// 消费元素
consumer = consumer.downstream;// 传递给下游处理
}
}
关于Stream带来好处的思考
首先是编程思维的转变,原来是面向对象,现在是面向数据。你需要考虑如何通过对手里的数据进行操作得到预期的结果。这个有点想SQL
。
其次是代码解决性、可读性,这个感觉更多是函数式编程带来的好处
最后,也是最重要的,依托java.util.concurrent
包,将并行化能力进行封装,也就是说,开发者可以通过最简单的方式实现并行化数据处理,不在需要控制线程(新建、启动、通信、异常处理)。
一些参考
深入理解Java Stream流水线
Java 8 Stream探秘
JAVA8中的stream原理解析——1(串行)
java8 Stream的实现原理 (从零开始实现一个stream流)
Stream源码分析
jdk8中Spliterator的作用
java8 Stream Pipelines 浅析
Side-effect是只一个方法不只是返回一个结果,还会改变对象的状态。
网友评论