源码是如何实现的
我们随便其中一个方法,比如filter,然后在ReferencePipeline类中找到方法的实现:
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
返回Stream这是链式操作的关键,大致分为2层StateLessOp可以看为一个ReferencePipeline,OpWrapSink可以理解为一个sink。
ReferencePipeline有一些StreamOpFlag和StreamShape的属性,还有StatelessOp特性。
sink理解为水槽,水槽里有我们要执行的操作,分为begin,accept,end方法,accept里放的就是我们实现的业务逻辑。
继续一层层跟进我们到了AbstractPipiline的构造方法:
![](https://img.haomeiwen.com/i15379026/9de178568660147e.png)
这里把ReferencePipeline形成了一个双向链表:
![](https://img.haomeiwen.com/i15379026/4f1f6a8efed2cbed.png)
如何运行的
在运行时打断点发现,关键代码在AbstractPipeline的copyInto方法。
![](https://img.haomeiwen.com/i15379026/d33b06e5221c6f8b.png)
![](https://img.haomeiwen.com/i15379026/f35a512b7bed4c41.png)
分析可得,假如我们的流是stream.limit(2).filter().map().count()
中间操作的执行流程就是
limit->begin()->filter->begin()->map->begin()
for(list中的元素1,2,3){
1limit->accept()->filter->accept()->map->accept()
2limit->accept()->filter->accept()->map->accept()
3limit->accept()没有通过}
limit->end()->filter->end()->map->end()
这就是操作组合后的执行过程也是实现流只需一次循环。
如何判断短路操作的
![](https://img.haomeiwen.com/i15379026/966d83367fd85fad.png)
如图中的combinedFlags成员变量,会根据前后的OpFlags来判断是否是短路操作。
![](https://img.haomeiwen.com/i15379026/f35a512b7bed4c41.png)
又是上面的一张图,此时会走else逻辑:
![](https://img.haomeiwen.com/i15379026/780e3427f2086571.png)
forEachWithCancel就是短路操作的关键。
一些技巧
- 强转类型
arrayList.stream().map(x -> (JSONObject) x).filter(x -> {
//本来就是JSONObject类型却不能在filter中强转获取,可以加一层强转map
}).findAny().get();
网友评论