Stream

作者: ttyttytty | 来源:发表于2022-03-15 20:44 被阅读0次

    学科归属&背景

    • Java8新特性,能够避免多重循环和反复遍历,导致有许多中间变量,增加内存的消耗。适用于统计/变换等场景。

    侧重点/目标

    1.中间的操作结果存在哪里?内存的消耗情况怎么样?

    • stream的实现不是对所有的数据依次进行单一操作,而是对一个元素遍历所有操作。内存消耗相比于迭代,大大降低。

    2.中间操作的状态&短路是指什么

    • 有状态,依赖上一个操作的结果,不能并行?如,sorted
    @Override
     public void end() {
         list.sort(comparator);
         downstream.begin(list.size());
         if (!cancellationWasRequested) {
             list.forEach(downstream::accept);
         }
         else {
             for (T t : list) {
                 if (downstream.cancellationRequested()) break;
                 downstream.accept(t);//等中间缓存的排序结果都ok end了,对排序后的结果,重新发起后续的操作
             }
         }
         downstream.end();
         list = null;
     }
    
    • 无状态,操作之间无关联。
    • 短路:执行到这个操作,这个元素有可能就被剔除,不能进行后续的操作了,类似continue。如,filter。=》合理布置stream操作顺序,可以减少计算量。

    3.传入的抽象方法实现,是存在哪里?是由哪个类怎么执行的?如何短路执行(中间取消)?

    • 惰性执行(调用终止方法时,才真正执行整个流操作)怎么实现的?Spliterator接口负责将数据转成流水线的输入,Sink接口是消费者,遍历消费操作。
    • 定义数据源,开启流水线:Collection的stream(),实现了Spliterator接口,返回构造出ReferencePipeline.Head实例
        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
        }
    
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
        }
    
    • 传入的抽象方法实现,会根据有无状态,构造出StatelessOp或 StatefulOp。一步步构成操作的双向链表,同时并没有实际执行操作。实际操作中,AbstractPipeline双头链表保存前后操作和头结点,方便同一元素进行操作。ChainedReference执行操作流的执行/停止。
        @Override
        public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
            Objects.requireNonNull(predicate);
            return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            };
        }
    
       @Override
       public final Stream<P_OUT> distinct() {
           return DistinctOps.makeRef(this);
       }
    
    static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
           return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                         StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
    ……
    
    • 终止方法,构造出TerminalOp,同时发起ReferencePipeline.evaluate,开始惰性执行
        @Override
        public void forEach(Consumer<? super P_OUT> action) {
            evaluate(ForEachOps.makeRef(action, false));
        }
    
        public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                      boolean ordered) {
            Objects.requireNonNull(action);
            return new ForEachOp.OfRef<>(action, ordered);
        }
    
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    // **并行与否**
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))//ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    
           @Override
           public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                              Spliterator<S> spliterator) {
               return helper.wrapAndCopyInto(this, spliterator).get();
           }
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    // 从最后操作往前,包成新的Sink,复合combinedFlags
       final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
           Objects.requireNonNull(sink);
    
           for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
               sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
           }
           return (Sink<P_IN>) sink;
       }
    
    
    // 第二个操作的节点,判断combinedFlags,如果短路了,即停止,然后重新反向执行?
    // 未短路,执行此次的新Sink反向流的begin,依次执行操作的,递归发起下一个元素的
       @Override
       final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
           Objects.requireNonNull(wrappedSink);
    
           if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
               wrappedSink.begin(spliterator.getExactSizeIfKnown());// 流水ok,你准备一下
               spliterator.forEachRemaining(wrappedSink);// Collection实现spliterator,开始执行遍历元素,递归执行
               wrappedSink.end();
           }
           else {
               copyIntoWithCancel(wrappedSink, spliterator);
           }
       }
    
    // ArrayList实现
    public void forEachRemaining(Consumer<? super E> action) {
               int i, hi, mc; // hoist accesses and checks from loop
               ArrayList<E> lst; Object[] a;
               if (action == null)
                   throw new NullPointerException();
               if ((lst = list) != null && (a = lst.elementData) != null) {
                   if ((hi = fence) < 0) {
                       mc = lst.modCount;
                       hi = lst.size;
                   }
                   else
                       mc = expectedModCount;
                   if ((i = index) >= 0 && (index = hi) <= a.length) {
                       for (; i < hi; ++i) {
                           @SuppressWarnings("unchecked") E e = (E) a[i];
                           action.accept(e);//各个操作通过Sink接口accept方法依次向下传递执行。
                       }
                       if (lst.modCount == mc)
                           return;
                   }
               }
               throw new ConcurrentModificationException();
           }
    

    4.泛型的使用

    • 自己套自己public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>            extends Spliterator<T> {
      
    interface Builder<T> extends Sink<T> {
    
            /**
             * Builds the node.  Should be called after all elements have been
             * pushed and signalled with an invocation of {@link Sink#end()}.
             *
             * @return the resulting {@code Node}
             */
            Node<T> build();
    
            /**
             * Specialized @{code Node.Builder} for int elements
             */
            interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
                @Override
                Node.OfInt build();
            }
    
    Lists.<Person>newArrayList().stream()
            .collect(Collectors.groupingBy(Person::getType, HashMap::new, Collectors.toList()));
    

    5.操作码是如何标识的。位的应用。sourceOrOpFlags

    6.设计模式

    责任链模式,一个接一个处理事件。
    

    7.典型使用
    collector&Collectors

    知识迁移

    1.并行的ForkJoinPool与多线程的关系,分治的思想如何有序拼接数据,如并行排序?

    REF


    [原来你是这样的 Stream —— 浅析 Java Stream 实现原理](https://zhuanlan.zhihu.com/p/47478339

    相关文章

      网友评论

          本文标题:Stream

          本文链接:https://www.haomeiwen.com/subject/ggoydrtx.html