几个算子:
partitionAggregate
一个batch,会拆成几个partition中,会针对每个partition中的tuple进行agg
partitionAggregate runs a function on each partition of a batch of tuples. 这句话的意思就是 一个batch,拆成多个partition里面
aggregate
对一个batch进行agg
如果前面跟一个groupby,那么就是对一个batch在某个group中所有tuple进行agg
aggregate可以前面不跟groupby,那么就是对一个stream的每个batch进行agg然后输出
persistentAggregate
对所有batch进行agg,存到一个State里面
如果前面跟一个groupby,那么对一个group中,所有batch的tuple数据进行agg,持久化到一个MapState 里, with the key being the grouping fields
这几个算子可以接受几种定制的Agg列: CombinerAggregator, ReducerAggregator, and Aggregator.
CombinerAggregator和persistentAggregate 搭配效率比较高
这个几个算计都可以连接成一个Chain,比如:
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
通过window算子中的所有batch进行聚合:
window中所有batch都会一起在一个聚合操作进行操作
比如下面,通过window算子,对一个window内的数据进行 wordcount,然后把每个kv输出到hbase表:
不同window里的相同key,在hbase里面会不断更新value,还是key上有window的标识???从下面这个例子来看,应该是会不断更新hbase。。。
// window-state table should already be created with cf:tuples columnHBaseWindowsStoreFactory windowStoreFactory =newHBaseWindowsStoreFactory(newHashMap(),"window-state","cf".getBytes("UTF-8"),"tuples".getBytes("UTF-8"));FixedBatchSpout spout =newFixedBatchSpout(newFields("sentence"),3,newValues("the cow jumped over the moon"),newValues("the man went to the store and bought some candy"),newValues("four score and seven years ago"),newValues("how many apples can you eat"),newValues("to be or not to be the person"));spout.setCycle(true);TridentTopology topology =newTridentTopology();Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(newFields("sentence"),newSplit(),newFields("word")) .window(TumblingCountWindow.of(1000), windowStoreFactory,newFields("word"),newCountAsAggregator(),newFields("count")) .peek(newConsumer() { @Overridepublicvoidaccept(TridentTuple input) { LOG.info("Received tuple: [{}]", input); } });StormTopology stormTopology = topology.build();
网友评论