在Java 8中,流有一个非常大的局限性,使用时,对它操作一次仅能得到 一个处理结果。如果试图多次遍历同一个流,将会遭遇下面这样的异常: java.lang.IllegalStateException: stream has already been operated upon or closed
流的设计就是如此,但我们在处理流时经常希望能同时获取多个结果。比如用一个流来解析日志文件,而不是在某个单一步骤中收集多个数据。
希望一次性向流中传递多个Lambda表达式。为达到这一目标,需要一个fork类型的方法,对每个复制的流应用不同的函数。更理想的情况是能以并发的方式执行这些操作,用不同的线程执行各自的运算得到对应的结果。但这些特性目前还没有在Java 8的流实现中提供。另外一种方式是利用一个通用API:Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现。
要达到在一个流上并发执行多个操作的效果,需要创建一个StreamForker,这个StreamForker会对原始的流进行封装,在此基础之上继续定义希望执行的各种操作。
public class StreamForker<T> {
private final Stream<T> stream;
private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();
public StreamForker(Stream<T> stream) {this.stream = stream;}
public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
forks.put(key, f); //使用一个键对流上的函数进行索引
return this; //返回this从而保证多次流畅地调用fork方法
}
public Results getResults() {
// To be implemented
}
}
fork方法接受两个参数:
- key参数:通过它可以取得操作结果,并将这些键/函数对累积到一个内部的Map中。
- Function参数:它对流进行处理,将流转变为代表这些操作结果的任何类型。
fork方法返回StreamForker自身,因此,可以通过复制多个操作构造一个流水线。
StreamForker详解
这里定义了希望在流上执行的三种操作,这三种操作通过三个键索引标识。StreamForker会遍历原始的流,并创建它的三个副本。这时就可以并行地在复制的流上执行这三种操作,这些函数运行的结果由对应的键进行索引,最终会填入到结果的Map。
所有由fork方法添加的操作的执行都是通过getResults方法调用触发的,该方法返回一个Results接口的实现,具体的定义如下:
public static interface Results {
public <R> R get(Object key);
}
该接口只有一个方法,可以将fork方法中使用的key对象作为参数传入,方法会返回该键对应的操作结果。
1、使用ForkingStreamConsumer实现Results接口
public Results getResults() {
ForkingStreamConsumer<T> consumer = build();
try {
stream.sequential().forEach(consumer);
} finally {
consumer.finish();
}
return consumer;
}
ForkingStreamConsumer同时实现了前面定义的Results接口和Consumer接口。它主要的任务就是处理流中的元素,将它们分发到多个BlockingQueues中处理,BlockingQueues的数量和通过fork方法提交的操作数是一致的。如果你在一个并发流上执行forEach方法,它的元素可能就不是顺序地被插入到队列中了。finish方法会在队列的末尾插入特殊元素表明该队列已经没有更多需要处理的元素了。build方法主要用于创建ForkingStreamConsumer。
使用build方法创建ForkingStreamConsumer:
private ForkingStreamConsumer<T> build() {
List<BlockingQueue<T>> queues = new ArrayList<>();//创建由队列组成的列表,每个队列对应一个操作
Map<Object, Future<?>> actions = forks.entrySet().stream().reduce(
new HashMap<Object, Future<?>>(),
(map, e) -> {
map.put(e.getKey(), getOperationResult(queues, e.getValue()));
return map;
},
(m1, m2) -> {
m1.putAll(m2);
return m1;
});
return new ForkingStreamConsumer<>(queues, actions);
}
首先创建了由BlockingQueues组成的列表,接着创建了一个Map,Map的键就是在流中用于标识不同操作的键,值包含在Future中,Future中包含了这些操作对应的处理结果。BlockingQueues的列表和Future组成的Map会被传递给 ForkingStreamConsumer的构造函数。每个Future都是通过getOperationResult方法创建的,代码如下:
private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
queues.add(queue);//创建一个队列,并将其添加到队列的列表中
Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);//创建一个 Spliterator,遍历队列中的元素
Stream<T> source = StreamSupport.stream(spliterator, false);//创建一个流,将 Spliterator作为数据源
return CompletableFuture.supplyAsync(() -> f.apply(source));//创建一个Future对象,以异步方式计算在流上执行特定函数的结果
}
getOperationResult方法会创建一个新的BlockingQueue,并将其添加到队列的列表。这个队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator,它会遍历读取队列中的每个元素;接下来创建了一个顺序流对该Spliterator进行遍历,最终会创建一个Future在流上执行某个希望的操作并收集其结果。这里的Future使用CompletableFuture类的一个静态工厂方法创建,CompletableFuture实现了Future接口。
2、开发ForkingStreamConsumer和BlockingQueueSpliterator
实现ForkingStreamConsumer类,为其添加处理多个队列的流元素:
static class ForkingStreamConsumer<T> implements Consumer<T>, Results {
static final Object END_OF_STREAM = new Object();
private final List<BlockingQueue<T>> queues;
private final Map<Object, Future<?>> actions;
ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
this.queues = queues;
this.actions = actions;
}
@Override
public void accept(T t) {
queues.forEach(q -> q.add(t));//将流中遍历的元素添加到所有的队列中
}
void finish() {
accept((T) END_OF_STREAM);//将最后一个元素添加到队列中,表明该流已经结束
}
@Override
public <R> R get(Object key) {
try {
return ((Future<R>) actions.get(key)).get();//等待Future完成相关的计算,返 回由特定键标识的处理结果
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
这个类同时实现了Consumer和Results接口,并持有两个引用,一个指向由BlockingQueues组成的列表,另一个指向了由Future构成的Map结构,它们表示的是即将在流上执行的各种操作。
Consumer接口要求实现accept方法。每当ForkingStreamConsumer接受流中的一 个元素,它就会将该元素添加到所有的BlockingQueues中。另外,当原始流中的所有元素都添加到所有队列后,finish方法会将最后一个元素添加所有队列。BlockingQueueSpliterators碰到最后这个元素时会知道队列中不再有需要处理的元素了。
Results接口需要实现get方法。一旦处理结束,get方法会获得Map中由键索引的Future,解析处理的结果并返回。
最后,流上要进行的每个操作都会对应一个BlockingQueueSpliterator。每个BlockingQueueSpliterator都持有一个指向BlockingQueues的引用,这个BlockingQueues是由ForkingStreamConsumer生成的。
遍历BlockingQueue并读取其中元素的Spliterator:
class BlockingQueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> q;
BlockingQueueSpliterator(BlockingQueue<T> q) {this.q = q;}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T t;
while (true) {
try {
t = q.take();
break;
} catch (InterruptedException e) {}
}
if (t != ForkingStreamConsumer.END_OF_STREAM) {
action.accept(t);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {return null;}
@Override
public long estimateSize() {return 0;}
@Override
public int characteristics() {return 0;}
}
这段代码实现了一个Spliterator,它并未定义如何切分流的策略,仅仅利用了流的 延迟绑定能力。由于这个原因,它也没有实现trySplit方法。
由于无法预测能从队列中取得多少个元素,所以estimatedSize方法也无法返回任何有意义的值。
图中左上角的StreamForker中包含一个Map结构,以方法的形式定义了流上要执行 的操作,这些方法分别有对应的键索引。右边的ForkingStreamConsumer为每一种操作的对象维护了一个队列,原始流中的所有元素会被分发到这些队列中。
图的下半部分,每一个队列都有一个BlockingQueueSpliterator从队列中提取元素作为各个流处理的源头。最后,由原始流复制创建的每个流,都会被作为参数传递给某个处理函数,执行对应的操作。
3、将StreamForker用于实战
使用StreamForker,通过复制原始的菜肴(dish)流,以并发的方式执行四种不同的操作。生成一份由逗号分隔的菜肴名列表,计算菜单的总热量,找出热量最高的菜肴,并按照菜的类型对这些菜进行分类。
Stream<Dish> menuStream = menu.stream();
StreamForker.Results results = new StreamForker<Dish>(menuStream)
.fork("shortMenu", s -> s.map(Dish::getName).collect(joining(", ")))
.fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
.fork("mostCaloricDish", s -> s.collect(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2)).get())
.fork("dishesByType", s -> s.collect(groupingBy(Dish::getType)))
.getResults();
String shortMenu = results.get("shortMenu");
int totalCalories = results.get("totalCalories");
Dish mostCaloricDish = results.get("mostCaloricDish");
Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");
System.out.println("Short menu: " + shortMenu);
System.out.println("Total calories: " + totalCalories);
System.out.println("Most caloric dish: " + mostCaloricDish);
System.out.println("Dishes by type: " + dishesByType);
StreamForker提供了一种使用简便、结构流畅的API,它能够复制流,并对每个复制的流施加不同的操作。这些应用在流上以函数的形式表示,可以用任何对象的方式标识。
如果你没有更多的流需要添加,可以调用StreamForker的getResults方法,触发所有定义的操作开始执行,并取得StreamForker.Results。这些操作的内部实现是异步的,getResults方法调用后会立刻返回,不会等待所有的操作完成拿到所有的执行结果才返回。
可以通过向StreamForker.Results接口传递标识特定操作的键取得某个操作的结果。如果该时刻操作已经完成,get方法会返回对应的结果;否则,该方法会阻塞,直到计算结束,取得对应的操作结果。
--参考文献《Java8实战》
网友评论