声明
本文首发于个人技术博客,转载请注明出处,本文链接:http://qifuguang.me/2015/11/28/Storm中文文档-Trident-API概览/
正文
Trident中最核心的数据模型是stream,它以一系列batch的形式被处理。stream被划分成很多个分区分配到集群中的各个节点去,每个分区上的操作是并行执行的。
在Trident中,有5种类型的操作:
- 本地分区操作,这种操作不需要网络传输的参与;
- 重新分区操作,重新分区一个流但是不改变其内容,需要网络传输;
- 聚合操作,需要网络传输的参与;
- 流分组操作;
- 合并和连接。
本地分区操作(Partition-local operations)
本地分区操作不需要网络传输,每一个Batch分区独立执行。
函数(Functions)
函数接收一些字段作为输入,发射零个或多个元组作为输出。输出元组的字段追加到输入元组的字段的后面。如果一个函数没有发射任何元组,原始的输入元组将会被过滤掉。否则,输入元组对于每个输出元组是重复的。假设MyFuction函数的定义如下:
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
现在假设有一个输入流mysrteam,包含3个字段a,b,c,流中包含如下3个元组:
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
运行如下代码:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
将会输出如下包含a,b,c,d四个字段的元组:
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
过滤器(Filters)
过滤器把一个元组作为输入,并决定是否需要保留该元组。假设你定义了如下过滤器:
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
}
}
现在假设你有包含a,b,c三个字段的如下元组:
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
运行下面的代码:
mystream.each(new Fields("b", "a"), new MyFilter())
将会输出如下元组:
[2, 1, 1]
分区聚合(partitionAggregate)
分区聚合在一批元组的每一个分区上运行一个函数。与函数不同的是,分区聚合的输出元组会覆盖掉输入元组。请看如下示例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假设你有一个包含a,b两个字段的输入流,元组的分区情况如下:
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
运行上面的那一行代码将会输出如下的元组,这些元组只包含一个sum字段:
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
你可以实现三种不同的接口来自定义自己的聚合器:CombinerAggregator, ReducerAggregator, and Aggregator。
下面是CombinerAggregator接口的定义:
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
CombinerAggregator返回只有一个字段的一个元组。CombinerAggregator在每个输入元组上运行init函数,然后通过combine函数聚合结果值直到只剩下一个元组。如果分区中没有任何元组,CombinerAggregator将返回zero函数中定义的元组。比如,下面是Count聚合器的实现:
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
CombinerAggregators的好处是可以使用聚合函数来替代分区聚合函数。在这种情况下,Trident通过在网路传输之前自动进行部分聚合达到优化计算的作用。
ReducerAggregator接口的定义如下:
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
ReducerAggregator通过init函数得到一个初始的值,然后对每个输入元组调用reduce方法计算值,产生一个元组作为输出。比如Count的ReducerAggregator实现如下:
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
ReducerAggregator也能用于持久聚合,稍后讲解。
最常用的聚合器的接口是Aggregator,它的定义如下:
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
Aggregators能够发射任意数量,任意字段的元组。并且可以在执行期间的任何时候发射元组,它的执行流程如下:
- 处理batch之前调用init方法,init函数的返回值是一个表示聚合状态的对象,该对象会传递到aggregate和complete函数;
- 每个在batch分区中的元组都会调用aggregate方法,该方法能够更新聚合状态并且发射元组;
- 当batch分区中的所有元组都被aggregate函数处理完时调用complete函数。
下面是使用Aggregator接口实现的Count聚合器:
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
有些时候,我们需要通知执行很多个聚合器,则可以使用如下的链式调用执行:
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
上面的代码将会在每一个分区执行Count和Sum聚合器,输出结果是包含count和sum两个字段的元组。
状态查询与分区持久化(stateQuery and partitionPersist)
状态查询用于查询状态源,分区持久化用于更新状态源,可以参阅这篇文章了解与状态相关的更多内容。
投影(projection)
流的投影操作只保留操作中指定的字段,如果对字段a,b,c,d运行如下的投影:
mystream.project(new Fields("b", "d"))
输出的六中将只包含b,d两个字段。
重分区操作(Repartitioning operations)
重分区操作通过运行一个函数改变元组在任务之间的分布,也可以调整分区的数量(比如重分区之后将并行度调大),重分区需要网络传输的参与。重分区函数包含以下这几个:
- shuffle:使用随机轮询算法在所有目标分区间均匀分配元组;
- broadcast:每个元组复制到所有的目标分区。这在DRPC中非常有用,例如,需要对每个分区的数据做一个stateQuery操作;
- partitionBy:接收一些输入字段,根据这些字段输入字段进行语义分区。通过对字段取hash值或者取模来选择目标分区。partitionBy保证相同的字段一定被分配到相同的目标分区;
- global:所有的元组分配到相同的分区,该分区是流种所有batch决定的;
- batchGlobal:同一个batch中的元组被分配到相同的目标分区,不同batch的元组有可能被分配到不同的目标分区;
-
partition:接收一个自定义的分区函数,自定义分区函数需要实现
backtype.storm.grouping.CustomStreamGrouping
接口。
聚合操作(Aggregation operations)
Trident提供aggregate和persistentAggregate方法对流进行聚合操作。aggregate独立运行在每个batch的流中,而persistentAggregate将聚合所有batch的流的元组,并将结果保存在一个状态源中。
假设在流中运行聚合器做全局聚合操作。当使用ReducerAggregator或者Aggregator时,流将会首先重新分区为一个单分区,然后再改分区上运行聚合函数。当使用CombinerAggregator聚合器时,Trident首先会计算每个分区的部分聚合,然后重新分区到单分区,在网络传输后完成聚合。推荐使用CombinerAggregator,因为它更有效。
下面是通过聚合得到一个全局计数的例子:
mystream.aggregate(new Count(), new Fields("count"))
就像partitionAggregate一样,aggregators也可以进行多重聚合。但是如果将一个CombinerAggregator和一个非CombinerAggregator一起使用,Trident将无法进行部分聚合以达到优化的效果。
流分组操作(Operations on grouped streams)
流分组操作通过对流的某些字段调用partitionBy来将流进行重新分区,字段相同的元组将会被分配到相同的分区。例如:
如果在一个流分组中运行聚合函数,聚合函数会在每一个组内运行,而不是对整个batch处理。persistentAggregate也可以运行在分组的流中,在这种情况下,结果将保存在一个以分组字段作为key的MapState中。阅读这篇文章了解更多关于persistentAggregate的知识。
和普通流一样,流分组的聚合器也可以执行链式操作。
合并和连接(Merges and joins)
API的最后一部分是关于怎么将不同的流连接在一起。最简单的方式就是将不同的流合并成一个流,在Trident中,你可以使用merge函数合并流:
topology.merge(stream1, stream2, stream3);
Trident将会重命名新的输出字段,合并的流以第一个流的输出字段来命名。
另一个合并流的方法是使用join函数,类似于SQL那样的join操作,它要求输入是有限的。所以join对于无限输入的流式毫无意义的,Trident中的join操作只适用于来自Spout的每一个小batch之间。
下面的例子中,stream1流包含key,val1,val2三个字段,stream2流包含x,val1两个字段:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
stream1流的key字段与stream2流的x字段组join操作,另外,Trident要求所有新流的输出字段被重命名,因为输入流可能包含相同的字段名称。连接流发射的元组将会包含:
- 连接字段的列表。在上面的例子中,字段key对应stream1的key,stream2的x;
- 来自所有流的所有非连接字段的列表,按照传递到连接方法的顺序排序。在上面的例子中,字段a与字段b对应stream1的val1和val2,c对应于stream2的val1.
当连接的流来自不同的Spout时,这些Spout会同步发射batch,也就是说batch处理会包括每个Spout的元组。
原英文文档链接: http://storm.apache.org/documentation/Trident-API-Overview.html
如果你喜欢我的文章,请关注我的微信订阅号:“机智的程序猿”,更多精彩,尽在其中:
网友评论