[Storm中文文档]Trident API概述

作者: winwill2012 | 来源:发表于2015-11-30 13:01 被阅读1195次

    声明

    本文首发于个人技术博客,转载请注明出处,本文链接:http://qifuguang.me/2015/11/28/Storm中文文档-Trident-API概览/

    正文

    Trident中最核心的数据模型是stream,它以一系列batch的形式被处理。stream被划分成很多个分区分配到集群中的各个节点去,每个分区上的操作是并行执行的。

    在Trident中,有5种类型的操作:

    1. 本地分区操作,这种操作不需要网络传输的参与;
    2. 重新分区操作,重新分区一个流但是不改变其内容,需要网络传输;
    3. 聚合操作,需要网络传输的参与;
    4. 流分组操作;
    5. 合并和连接。

    本地分区操作(Partition-local operations)

    本地分区操作不需要网络传输,每一个Batch分区独立执行。

    函数(Functions)

    函数接收一些字段作为输入,发射零个或多个元组作为输出。输出元组的字段追加到输入元组的字段的后面。如果一个函数没有发射任何元组,原始的输入元组将会被过滤掉。否则,输入元组对于每个输出元组是重复的。假设MyFuction函数的定义如下:

    public class MyFunction extends BaseFunction {
        public void execute(TridentTuple tuple, TridentCollector collector) {
            for(int i=0; i < tuple.getInteger(0); i++) {
                collector.emit(new Values(i));
            }
        }
    }
    

    现在假设有一个输入流mysrteam,包含3个字段a,b,c,流中包含如下3个元组:

    [1, 2, 3]
    [4, 1, 6]
    [3, 0, 8]
    

    运行如下代码:

    mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
    

    将会输出如下包含a,b,c,d四个字段的元组:

    [1, 2, 3, 0]
    [1, 2, 3, 1]
    [4, 1, 6, 0]
    

    过滤器(Filters)

    过滤器把一个元组作为输入,并决定是否需要保留该元组。假设你定义了如下过滤器:

    public class MyFilter extends BaseFilter {
        public boolean isKeep(TridentTuple tuple) {
            return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
        }
    }
    

    现在假设你有包含a,b,c三个字段的如下元组:

    [1, 2, 3]
    [2, 1, 1]
    [2, 3, 4]
    

    运行下面的代码:

    mystream.each(new Fields("b", "a"), new MyFilter())
    

    将会输出如下元组:

    [2, 1, 1]
    

    分区聚合(partitionAggregate)

    分区聚合在一批元组的每一个分区上运行一个函数。与函数不同的是,分区聚合的输出元组会覆盖掉输入元组。请看如下示例:

    mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
    

    假设你有一个包含a,b两个字段的输入流,元组的分区情况如下:

    Partition 0:
    ["a", 1]
    ["b", 2]
    
    Partition 1:
    ["a", 3]
    ["c", 8]
    
    Partition 2:
    ["e", 1]
    ["d", 9]
    ["d", 10]
    

    运行上面的那一行代码将会输出如下的元组,这些元组只包含一个sum字段:

    Partition 0:
    [3]
    
    Partition 1:
    [11]
    
    Partition 2:
    [20]
    

    你可以实现三种不同的接口来自定义自己的聚合器:CombinerAggregator, ReducerAggregator, and Aggregator。

    下面是CombinerAggregator接口的定义:

    public interface CombinerAggregator<T> extends Serializable {
        T init(TridentTuple tuple);
        T combine(T val1, T val2);
        T zero();
    }
    

    CombinerAggregator返回只有一个字段的一个元组。CombinerAggregator在每个输入元组上运行init函数,然后通过combine函数聚合结果值直到只剩下一个元组。如果分区中没有任何元组,CombinerAggregator将返回zero函数中定义的元组。比如,下面是Count聚合器的实现:

    public class Count implements CombinerAggregator<Long> {
        public Long init(TridentTuple tuple) {
            return 1L;
        }
    
        public Long combine(Long val1, Long val2) {
            return val1 + val2;
        }
    
        public Long zero() {
            return 0L;
        }
    }
    

    CombinerAggregators的好处是可以使用聚合函数来替代分区聚合函数。在这种情况下,Trident通过在网路传输之前自动进行部分聚合达到优化计算的作用。

    ReducerAggregator接口的定义如下:

    public interface ReducerAggregator<T> extends Serializable {
        T init();
        T reduce(T curr, TridentTuple tuple);
    }
    

    ReducerAggregator通过init函数得到一个初始的值,然后对每个输入元组调用reduce方法计算值,产生一个元组作为输出。比如Count的ReducerAggregator实现如下:

    public class Count implements ReducerAggregator<Long> {
        public Long init() {
            return 0L;
        }
    
        public Long reduce(Long curr, TridentTuple tuple) {
            return curr + 1;
        }
    }
    

    ReducerAggregator也能用于持久聚合,稍后讲解。

    最常用的聚合器的接口是Aggregator,它的定义如下:

    public interface Aggregator<T> extends Operation {
        T init(Object batchId, TridentCollector collector);
        void aggregate(T state, TridentTuple tuple, TridentCollector collector);
        void complete(T state, TridentCollector collector);
    }
    

    Aggregators能够发射任意数量,任意字段的元组。并且可以在执行期间的任何时候发射元组,它的执行流程如下:

    1. 处理batch之前调用init方法,init函数的返回值是一个表示聚合状态的对象,该对象会传递到aggregate和complete函数;
    2. 每个在batch分区中的元组都会调用aggregate方法,该方法能够更新聚合状态并且发射元组;
    3. 当batch分区中的所有元组都被aggregate函数处理完时调用complete函数。

    下面是使用Aggregator接口实现的Count聚合器:

    public class CountAgg extends BaseAggregator<CountState> {
        static class CountState {
            long count = 0;
        }
    
        public CountState init(Object batchId, TridentCollector collector) {
            return new CountState();
        }
    
        public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
            state.count+=1;
        }
    
        public void complete(CountState state, TridentCollector collector) {
            collector.emit(new Values(state.count));
        }
    }
    

    有些时候,我们需要通知执行很多个聚合器,则可以使用如下的链式调用执行:

    mystream.chainedAgg()
            .partitionAggregate(new Count(), new Fields("count"))
            .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
            .chainEnd()
    

    上面的代码将会在每一个分区执行Count和Sum聚合器,输出结果是包含count和sum两个字段的元组。

    状态查询与分区持久化(stateQuery and partitionPersist)

    状态查询用于查询状态源,分区持久化用于更新状态源,可以参阅这篇文章了解与状态相关的更多内容。

    投影(projection)

    流的投影操作只保留操作中指定的字段,如果对字段a,b,c,d运行如下的投影:

    mystream.project(new Fields("b", "d"))
    

    输出的六中将只包含b,d两个字段。

    重分区操作(Repartitioning operations)

    重分区操作通过运行一个函数改变元组在任务之间的分布,也可以调整分区的数量(比如重分区之后将并行度调大),重分区需要网络传输的参与。重分区函数包含以下这几个:

    1. shuffle:使用随机轮询算法在所有目标分区间均匀分配元组;
    2. broadcast:每个元组复制到所有的目标分区。这在DRPC中非常有用,例如,需要对每个分区的数据做一个stateQuery操作;
    3. partitionBy:接收一些输入字段,根据这些字段输入字段进行语义分区。通过对字段取hash值或者取模来选择目标分区。partitionBy保证相同的字段一定被分配到相同的目标分区;
    4. global:所有的元组分配到相同的分区,该分区是流种所有batch决定的;
    5. batchGlobal:同一个batch中的元组被分配到相同的目标分区,不同batch的元组有可能被分配到不同的目标分区;
    6. partition:接收一个自定义的分区函数,自定义分区函数需要实现backtype.storm.grouping.CustomStreamGrouping接口。

    聚合操作(Aggregation operations)

    Trident提供aggregate和persistentAggregate方法对流进行聚合操作。aggregate独立运行在每个batch的流中,而persistentAggregate将聚合所有batch的流的元组,并将结果保存在一个状态源中。

    假设在流中运行聚合器做全局聚合操作。当使用ReducerAggregator或者Aggregator时,流将会首先重新分区为一个单分区,然后再改分区上运行聚合函数。当使用CombinerAggregator聚合器时,Trident首先会计算每个分区的部分聚合,然后重新分区到单分区,在网络传输后完成聚合。推荐使用CombinerAggregator,因为它更有效。

    下面是通过聚合得到一个全局计数的例子:

    mystream.aggregate(new Count(), new Fields("count"))
    

    就像partitionAggregate一样,aggregators也可以进行多重聚合。但是如果将一个CombinerAggregator和一个非CombinerAggregator一起使用,Trident将无法进行部分聚合以达到优化的效果。

    流分组操作(Operations on grouped streams)

    流分组操作通过对流的某些字段调用partitionBy来将流进行重新分区,字段相同的元组将会被分配到相同的分区。例如:


    如果在一个流分组中运行聚合函数,聚合函数会在每一个组内运行,而不是对整个batch处理。persistentAggregate也可以运行在分组的流中,在这种情况下,结果将保存在一个以分组字段作为key的MapState中。阅读这篇文章了解更多关于persistentAggregate的知识。

    和普通流一样,流分组的聚合器也可以执行链式操作。

    合并和连接(Merges and joins)

    API的最后一部分是关于怎么将不同的流连接在一起。最简单的方式就是将不同的流合并成一个流,在Trident中,你可以使用merge函数合并流:

    topology.merge(stream1, stream2, stream3);
    

    Trident将会重命名新的输出字段,合并的流以第一个流的输出字段来命名。

    另一个合并流的方法是使用join函数,类似于SQL那样的join操作,它要求输入是有限的。所以join对于无限输入的流式毫无意义的,Trident中的join操作只适用于来自Spout的每一个小batch之间。

    下面的例子中,stream1流包含key,val1,val2三个字段,stream2流包含x,val1两个字段:

    topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
    

    stream1流的key字段与stream2流的x字段组join操作,另外,Trident要求所有新流的输出字段被重命名,因为输入流可能包含相同的字段名称。连接流发射的元组将会包含:

    1. 连接字段的列表。在上面的例子中,字段key对应stream1的key,stream2的x;
    2. 来自所有流的所有非连接字段的列表,按照传递到连接方法的顺序排序。在上面的例子中,字段a与字段b对应stream1的val1和val2,c对应于stream2的val1.

    当连接的流来自不同的Spout时,这些Spout会同步发射batch,也就是说batch处理会包括每个Spout的元组。

    原英文文档链接: http://storm.apache.org/documentation/Trident-API-Overview.html
    如果你喜欢我的文章,请关注我的微信订阅号:“机智的程序猿”,更多精彩,尽在其中:

    相关文章

      网友评论

      本文标题:[Storm中文文档]Trident API概述

      本文链接:https://www.haomeiwen.com/subject/uyrjhttx.html