美文网首页
Java8 Lambda源码解析

Java8 Lambda源码解析

作者: 高19 | 来源:发表于2023-01-11 17:56 被阅读0次

    原文地址:https://mp.weixin.qq.com/s/d8BQ1-lEbeLuhZbn-t68yQ

    代码示例

    static class A{
    @Getter
    private String a;
    @Getter 
    private Integer b;
    
    public A(String a,Integer b){
      this.a = a;
      this.b = b;
      }
    }
    public static void main(String[] args){
        List<Integer> ret = Lists.newArrayList(new A("a",1),new A("b",2),new A("c",3))
        .stream()
        .map(A::getB)
        .filter(b -> b >= 2)
        .collect(Collectors.toList());
        System.out.println(ret); 
    }
    

    上述示例代码中,主要执行方法

    1. ArrayList.stream
    2. map
    3. filter
    4. collect

    1.1 ArrayList.stream 实际调用的是 Collector.stream 方法:

    default Stream(E) stream(){
       return StreamSupport.stream(spliterator(),false);
    }
    

    spliterator()方法生成的是IteratorSpliterator对象,spliterator的意思就是可以split的iterator,这个主要是用于lambda中的parallelStream中的并行操作,上面的例子中由于调用的是stream,所以parallel = false

    StreamSupport.steam最后生成的是一个 ReferencePipeline.Head 对象:

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel){
      Objects.requireNonNull(spliterator);
      return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
    }
    

    Head类是从ReferencePipeline派生的,表示lambda的pipeline中的头节点.
    有了这个Head对象之后,在它之上调用.map,实际上就是调用了基类ReferencePipeline.map方法:

    public final <R> Stream<R> map(Function<? super P_OUT,? extends R> mapper){
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT,R>(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED |     
         StreamOpFlag.NOT_DISTINCT){
            Sink<P_OUT> opWrapSink(int flags,Sink<R> sink){
                 return new Sink.ChainedReference<P_OUT,R>(sink){
                      public void accept(P_OUT u){
                          downstream.accept(mapper.apply(u));
                }
            };
          }
      };
    }
    

    返回的是一个StatelessOp,表示一个无状态的算子,这个类也是ReferencePipeline的子类,可以看到它的构造函数,第一个参数this,表示把Head对象作为StatelessOp对象的upstream,也就是它的上游.
    接着调用StatelessOp.filter方法. 也还是会回到ReferencePipeline.filter方法:

    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate){
      Objects.requireNonNull(predicate);
      return new StatelessOp<P_OUT,P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED){
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink){
             return new Sink.ChainedReference<P_OUT,P_OUT>(sink) {
               @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            };
        }
    

    可以看到,仍然生成的是一个StatelessOp对象,只是它的upstream变了而已.
    最后调用StatelessOp.collect,继续回到ReferencePipeline.collect方法:

    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
            A container;
            if (isParallel()
                    && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                    && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
                container = collector.supplier().get();
                BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
                forEach(u -> accumulator.accept(container, u));
            }
            else {
                container = evaluate(ReduceOps.makeRef(collector));
            }
            return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
                   ? (R) container
                   : collector.finisher().apply(container);
        }
    

    在前面几步, .map, .filter方法其实都只是创建StatelessOp对象,但是到collect就不一样了, 了解spark/flink的就知道,collect其实是个action/sink,调用了collect,就会真实地触发这个stream上各个operator的执行.这也就是我们经常听到的lazy execution,所有的操作,只有碰到action的算子才会开始执行.

    之前讲到这个stream的parallel=false, 所以上面的实际执行逻辑是:

    A container = evaluate(ReduceOps.makeRef(collector));
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
                   ? (R) container
                   : collector.finisher().apply(container);
        }
    

    在进入evaluate方法之前,先看一下ReduceOps.makeRef(collector), 它实际上就是基于Collectors.toList生成的CollectorImpl实际包装了一层,返回了一个TerminalOp对象(实际是ReduceOp).

    public static <T, I> TerminalOp<T, I>
        makeRef(Collector<? super T, I, ?> collector) {
            Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
            BiConsumer<I, ? super T> accumulator = collector.accumulator();
            BinaryOperator<I> combiner = collector.combiner();
            class ReducingSink extends Box<I>
                    implements AccumulatingSink<T, I, ReducingSink> {
                @Override
                public void begin(long size) {
                    state = supplier.get();
                }
    
                @Override
                public void accept(T t) {
                    accumulator.accept(state, t);
                }
    
                @Override
                public void combine(ReducingSink other) {
                    state = combiner.apply(state, other.state);
                }
            }
            return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
                @Override
                public ReducingSink makeSink() {
                    return new ReducingSink();
                }
    
                @Override
                public int getOpFlags() {
                    return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                           ? StreamOpFlag.NOT_ORDERED
                           : 0;
                }
            };
        }
    

    从上面的代码可以看到,基本也就是直接调用了collector的实现,稍微需要注意的是,ReducingSink从Box派生,Box的意思就是盒子,它里面有个state成员,表示一个计算的状态. ReducingSink就是通过这个state,进行combine,accumulate操作(实际就是一个List)
    回到evaluate方法,它实际调用了:

    terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    

    这里this就是最后阶段的ReferencePipeline,即StatelessOp,这里我们称它为ReferencePipeline$2,即经过两个算子操作的pipeline.
    sourceSpliterator则会取到sourceStage的spliterator,即最上面Head的spliterator.
    ReduceOp.evaluateSequential

    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                               Spliterator<P_IN> spliterator) {
                return helper.wrapAndCopyInto(makeSink(), spliterator).get();
            }
    

    helper即ReferencePipeline$2,这里makeSink即上面返回的ReducingSink重载的方法.
    ReferencePipeline.wrapAndCopyInto,在其父类AbstractPipeline中实现

      copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
            return sink;
    

    wrapSink代码:

     final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
    
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }
    

    可以看到,这里就是将pipeline从后至前,分别调用每个pipeline的opWrapSink方法,就是一个责任链的模式。opWrapSink可以看上面map的opWrapSink的filter的opWrapSink实现,map的很简单,直接调用mapper.apply,实际上就是A::getB方法,filter的也很简单,调用的是 predicate.test 方法。
    接下来到copyInto方法,到这里才会有真正的执行逻辑:

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
    
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                spliterator.forEachRemaining(wrappedSink);
                wrappedSink.end();
            }
            else {
                copyIntoWithCancel(wrappedSink, spliterator);
            }
        }
    

    它会走入到这部分的逻辑中:

    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    spliterator.forEachRemaining(wrappedSink);
    wrappedSink.end();
    

    这里面最重要的就是中间这行了,由于spliterator持有的Collection引用,是ArrayList,因此它会调用ArrayList.forEachRemaining方法:

    public void forEachRemaining(Consumer<? super E> action) {
        // ...
        if ((i = index) >= 0 && (index = hi) <= a.length) {
           for (; i < hi; ++i) {
               @SuppressWarnings("unchecked") E e = (E) a[i];
               action.accept(e);
           }
           if (lst.modCount == mc)
               return;
       }
        // ...
    

    这里的action参数,就是上面经过责任链封装的Sink(它也是Consumer的子类)
    而这里调用action.accept,就会通过责任链来一层层调用每个算子的accept,我们从map的accept开始:

    @Override
    Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
        return new Sink.ChainedReference<P_OUT, R>(sink) {
            @Override
            public void accept(P_OUT u) {
                downstream.accept(mapper.apply(u));
            }
        };
    }
    

    可以看到,它先调用mapper.apply,然后把结果直接传给downstream.accept,也就是调用filter的accept,接着来到ReducingSink.accept,也就是往state中添加一个结果元素,这样forEach执行完之后,结果自然就有了
    看完上面的流程,接下来看一下lambda里面部分类设计,首先来看一下Stream,它的基类是BaseStream,提供以下接口:

    public interface BaseStream<T, S extends BaseStream<T, S>>
            extends AutoCloseable {
        /**
         * 返回stream中元素的迭代器
            */
        Iterator<T> iterator();
    
        /**
         * 返回stream中元素的spliterator,用于并行执行
         */
        Spliterator<T> spliterator();
    
        /**
         * 是否并行
         */
        boolean isParallel();
    
        /**
         * 返回串行的stream,即强制parallel=false
         */
        S sequential();
    
        /**
         * 返回并行的stream,即强制parallel=true
         */
        S parallel();
    
        // ...
    }
    

    直接继承此接口的,是如IntStream,LongStream,DoubleStream等,这些是在BaseStream基础上,提供了filter,map,mapToObj,distinct等算子的接口,但是这些算子,是限定类型的,如IntStream.filter,它接受的就是IntPredicate,而不是常规的Predicate; map方法也是,接受的是IntUnaryOperator
    IntStream,LongStream这些都是接口,也就是仅仅用来描述算子的.它们的实现都是基于Pipeline的,基类为AbstractPipeline,它的几个关键成员变量:

    /**
          * 最顶上的pipeline,即Head
          */
        private final AbstractPipeline sourceStage;
    
        /**
         * 直接上游pipeline
         */
        private final AbstractPipeline previousStage;
    
        /**
         * 直接下游pipeline
         */
        @SuppressWarnings("rawtypes")
        private AbstractPipeline nextStage;
    
        /**
         * pipeline深度
         */
        private int depth;
        
        /**
         * head的spliterator
         */
        private Spliterator<?> sourceSpliterator;
    
         // ...
    

    这个基类还提供了pipeline的基础实现,以及对BaseStream和PipelineHelper接口的实现,如evaluate,sourceStageSpliterator,wrapAndCopyInto,wrapSink等
    类似地,从AbstractPipeline派生的子类有:IntPipeline,LongPipeline,DoublePipeline,ReferencePipeline等. 前面三种比较容易理解,提供的是基于原始类型的lambda操作(且都实现了对应的XXstream接口),而ReferencePipeline提供的是基于对象的lambda操作.
    类层次如下:


    ReferencePipeline类层次

    注意这些子类,也都是abstract的,每一种pipeline下面,都有Head, StatelessOp, StatefulOp三个子类。分别用于描述pipeline的头节点,无状态中间算子,有状态中间算子。
    Head是非抽象类,StatelessOp也是抽象类,它在map、filter、mapToObj等算子中,会动态创建它的匿名子类,并实现opWrapSink方法。
    通过这种设计,除了collect之外,所有算子的返回结果都是Stream的子类,在IntPipeline中,map, flatMap, filter等都返回IntStream,即使它们的实现可能是StatelessOp, Head等,都对外提供了统一的接口。同时由于lambda中每个算子的实现是动态的,如最上面例子中A::getB, b -> b>=2等,那就通过每个算子重载 opWrapSink 方法来动态封装这些逻辑。
    同时,通过将XXStream和XXPipeline分开的设计,可以保持Stream接口的简洁(对用户透出的接口)。否则如果将BaseStream做成抽象类,将AbstractPipeline相关的逻辑移到这里面,会导致Stream变得非常臃肿,在API层面用户使用的时候也会很困惑。
    创建Pipeline的地方,则统一收口到了StreamSupport类中,这是一个大的工厂类。虽然ArrayList, Arrays等类中都提供了stream的方法,但是最后都统一调用了StreamSupport里来创建Pipeline的实例,通常也就是创建 XXPipeline.Head对象,然后通过这个对象进行其他lambda算子的添加。

    相关文章

      网友评论

          本文标题:Java8 Lambda源码解析

          本文链接:https://www.haomeiwen.com/subject/npawfdtx.html