什么是流
与文件的输入/输出流无关,流(Stream)是一种类似集合的概念。我们能够通过流来定义对集合“做什么”来实现我们的目标。
如何操作流
使用流来达成目的需要经过三步:
- 创建流
- 转换流(即将初始流通过我们定义的“做什么”将其转换成其他流)
- 终结操作(terminal peration,从最终的流中获得我们需要的数据)
下面是一个例子,该例子创建了一个Integer
类型的流,并对流中的所有元素调用x=x+1
的方法,最后使用reduce
终止操作获得整个流的和。流的操作是尽可能惰性执行的,直到需要结果时才会执行,即应用了终止操作后,之前的惰性操作会强制执行,流就失去了作用。
Stream<Integer> s=Stream.of(1,2,3,4);
int sum=s.map(x->x+1).reduce(0,(x,y)->{x+y});
创建流
//创建一个元素的流
public static<T> Stream<T> of(T t)
//从多个元素中创建流,参数可以是数组也可以是多个单值
public static<T> Stream<T> of(T... values)
//创建一个空流
public static<T> Stream<T> empty()
//创建一个无限流,元素分别为seed,f(seed),f(f(seed)),....
//例子:Stream<Integer> s=Stream.iterate(1,x->x+1);
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
//通过Supplier创建无限流
//例子:Stream<Integer> s=Stream.generate(Math::random)
public static<T> Stream<T> generate(Supplier<T> s)
Stream
类中提供了如上方法创建一个流。除此之外,Arrays
、集合类、Files
也都提供了创建流的方法。
//从一个数组的给定范围创建流
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive)
//从集合创建流
default Stream<E> stream()
//从文件创建流,元素为文件中的一行
public static Stream<String> lines(Path path, Charset cs) throws IOException
转换流
Stream
提供了多种方法来进行转换流。
Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
这是三种主要的流转换方法。
- filter
filter
的引元是Predicate<T>,即从T到boolean的函数,可以使用此方法获得满足条件的流,比如通过Stream s=Stream.of(1,2,3,4,5).filter(x->x>3)
我们就可以获得大于3的流。 - map
map
对Stream
中的每个元素调用mapper
方法,比如Stream s=Stream.of(1,2,3,4,5).map(x->x=x+1)
可以得到每个元素加一的流。 - flatmap
flatmap
可以处理后得到的Stream<Stream<T>>
平摊为Stream<T>
的流。比如Stream.of("apple","bit").map(x->letters(x))
将得到[["a","p","p","l","e"],["b","i","t"]]
,而Stream.of("apple","bit").flatmap(x->letters(x))
将得到["a","p","p","l","e","b","i","t"]
。
除了以上3种方法,Stream
类还提供了获取子流、排序等方法。
//获取前maxSize个元素的流
Stream<T> limit(long maxSize);
//跳过前n个元素的流
Stream<T> skip(long n);
//将两个流合并成一个流
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) ;
//对流进行排序
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
//移除流中重复的元素
Stream<T> distinct();
//每个元素被使用时都会先传递给action方法,这个方法主要被用来debug
//例子:Stream.of(1,2,3,4).peek(x->System.out.print(x)).map(x->x=x+1)
Stream<T> peek(Consumer<? super T> action);
终结操作
定义了流并进行转换后,我们终于可以得到我们想要的数据。Stream
提供了如下的终结操作。
Optional<T> min(Comparator<? super T> comparator);
Optional<T> max(Comparator<? super T> comparator);
long count();
boolean anyMatch(Predicate<? super T> predicate);
boolean allMatch(Predicate<? super T> predicate);
boolean noneMatch(Predicate<? super T> predicate);
void forEach(Consumer<? super T> action);
void forEachOrdered(Consumer<? super T> action);
Optional<T> findFirst();
Optional<T> findAny();
//可以通过reduce操作对流中的数据进行处理。如果定义了简约操作(accumulator)op
//那么返回的值为 v1 op v2 op v3 ...
//可以通过定义identity,当流为空时返回该值
Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner);
当然,转换后的流并不一定非要进行上述的终结操作,我们也可以将得到的结果存入集合中来使用。
//获取流的iterator,遍历方法与集合的iterator相同
Iterator<T> iterator();
Object[] toArray();
<A> A[] toArray(IntFunction<A[]> generator);
<R, A> R collect(Collector<? super T, A, R> collector);
<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner)
对于其中的collect
方法,可以通过定义不同收集器来获得不同的集合类型,比如Stream.of(1,2,3,4).collect(Collectors.toSet())
来获得Set
。收集器Collectors
有如下这些方法(部分),方法的说明在注释部分。
public static <T> Collector<T, ?, List<T>> toList();
public static <T> Collector<T, ?, Set<T>> toSet();
//通过keyMapper生成key,通过valueMapper生成value,通过mergeFunction合并key相同的对象
//例子:Map<integer,String> idName=people.collect(Collectors.toMap(Person::getId,Person::getName,
// (existValue,newValue)->existValue))
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper
,Function<? super T, ? extends U> valueMapper);
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper
,Function<? super T, ? extends U> valueMapper,BinaryOperator<U> mergeFunction)
//将元素收集到指定的集合中
//例子:person.collect(Collectors.toCollection(TreeSet::new))
public static <T, C extends Collection<T>>
Collector<T, ?, C> toCollection(Supplier<C> collectionFactory);
//合并字符串,可以自定义分隔符、第一个元素的前缀和最后一个元素的后缀
public static Collector<CharSequence, ?, String> joining();
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter);
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
CharSequence prefix,
CharSequence suffix);
//生成(Int|Long|Double)SummaryStatistics
/* (Int|Long|Double)SummaryStatistics可以通过
* getCount()产生元素个数
* getSum()求和
* getAverage()求平均数
* getMax()、getMin() 获得最大/小值,如果没有元素时返回(MAX|MIN)_VALUE
* 例子:Stream.of(1,2,3,4).collect(Collectors.summaringzingInt(x->x)).getCount();
*/
public static <T>
Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper);
public static <T>
Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> mapper);
public static <T>
Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper)
//对映射表中的元素进行分组
/*
* classifier为分类的标准,比如Persion::getName以名字分组
* mapFactory为映射表的工厂类,比如TreeMap::new
* downstream为对分类后的数据进行的操作,有counting()、summingInt()、maxBy(Cmparator)、minBy()
* mapping(mapper,downstream)
*/
//例子:people.collect(
// Collectors.groupingBy(
// Person::getName,
// Collectors.mapping(Person::getId,Collectors.counting())))
public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier)
public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream)
public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,Collector<? super T, A, D> downstream)
并行流
我们可以创建并行流来加快流的处理速度。获取流的方法有两种,一是使用Strea.parallel()
,二是使用Collection.parallelStreamm()
从集合中获取并行流。只要终结方法执行时,流处于并行状态,所有的中间操作都将并行执行。
网友评论