一 概述
Collector是收集器接口,自定义的收集器需要实现此接口,实现其系列方法。该接口的方法有supplier(),accumulator(),combiner(),finisher()和characteristics()。
Supplier< A > supplier():A类型是收集器中间容器的类型,该方法用于创建并返回一个新的中间结果容器。
BiConsumer<A, T> accumulator():BiConsumer接受两个参数,不返回结果。A类型是收集器中间容器的类型,T类型是流中元素的类型。该方法将一个元素添加至中间容器中。
BinaryOperator< A > combiner():BinaryOperator接受两个参数,返回一个结果。A类型是中间容器的类型。该方法将俩个参数的中间容器合并,该方法合并的方法有两种:第一种将两个参数的状态合并至其中一个并返回,比如listA和listB,将listB的元素添加至listA中,并将listA返回。第二种创建一个新的中间容器。比如listA和listB,该方案创建listC,listC中包含了listA和listB的所有元素,并将listC返回。
Function<A, R> finisher():Function接受一个参数,返回一个结果。A类型是中间容器的类型,R类型就是声明的结果容器类型。该方法执行一个A类型至R类型的转换。假如该实现设置了IDENTITY_TRANSFORM,将会执行一个A到R的强制类型转换。
Set<Characteristics> characteristics():该方法设置自定义收集器的特点。Characteristics是一个枚举类,包含的特征有CONCURRENT、UNORDERED、IDENTITY_FINISH。
IDENTITY_FINISH:表明finisher()是恒等的,如果设置了此特征,finisher()的lambda体将不会执行。
CONCURRENT:表明收集器是并行的,多线程并行的执行accumulator(),执行的对象是同一个中间容器。如果设置了此特征只会有一个中间结果容器。
UNORDERED:表明收集是无序的。
二 实例
将元素收集至Set中,中间容器和结果容器都是Set
public class MyCollector<T> implements Collector<T, Set<T>, Set<T>> {
@Override
public Supplier<Set<T>> supplier() {
System.out.println("supplier invoke");
return TreeSet::new;//中间容器为TreeSet
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
System.out.println("accumulator invoke");
return Set::add;//收集的方式是添加元素
}
@Override
public BinaryOperator<Set<T>> combiner() {
System.out.println("combiner invoke");
return (left, right) -> {//中间容器合并的方式为状态叠加
left.addAll(right);
return left;
};
}
@Override
public Function<Set<T>, Set<T>> finisher() {
System.out.println("finisher invoke");
return Function.identity();//将收集结果直接输出
}
@Override
public Set<Characteristics> characteristics() {
System.out.println("characteristics invoke");
return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
IDENTITY_FINISH:表明finisher()是恒等的,如果设置了此特征,finisher()的lambda体将不会执行。
CONCURRENT:表明收集器是并行的,多线程并行的执行accumulator(),执行的对象是同一个中间容器。如果设置了此特征只会有一个中间结果容器。
UNORDERED:表明收集是无序的。
}
}
public class Test {
public static void main(String[] args) {
List<String> stringList = Arrays.asList("Hello", "World", "Hello World", "Hello", "a", "ab");
Set<String> stringSet=new TreeSet<>();
stringSet.addAll(stringList);
stringSet.stream()
.collect(new MyCollector<>());
}
}
//执行结果是
supplier invoke
accumulator invoke
combiner invoke
characteristics invoke
characteristics invoke
//我们发现 finisher的lambda并未执行
//如果特征值不包含 IDENTITY_FINISH 在看执行结果
supplier invoke
accumulator invoke
combiner invoke
characteristics invoke
characteristics invoke
finisher invoke //调用了lambda
finisher lambda
源码执行流程如下:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container; //中间结果容器的类型
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) //如果包含了IDENTITY_FINISH特征
? (R) container //这里直接强制类型转换
: collector.finisher().apply(container); //如果不包含 则运行finisher的lambda
}
如果设置了特征值CONCURRENT,不管是串行流还是并行流结果容器都是一个,因此需要注意集合的遍历与修改异常
将元素收集至Map中,中间容器为Set
public class StringCollector<T> implements Collector<T, Set<T>, Map<T, T>> {
@Override
public Supplier<Set<T>> supplier() {
return HashSet::new;//中间容器
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
return Set::add;
}
@Override
public BinaryOperator<Set<T>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}
@Override
public Function<Set<T>, Map<T, T>> finisher() {
return (set) -> {//将中间容器转为结果Map
Map<T, T> map = new HashMap<>();
set.forEach(it -> map.put(it, it));
return map;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));//如果将特征值改为IDENTITY_FINISH则会发生强制类型转换异常
}
}
网友评论