Storm Trident 之 Trident API介绍

作者: 零度沸腾_yjz | 来源:发表于2019-02-27 20:32 被阅读0次

    Trident是什么

    Trident是Storm上的高层次抽象,它能够在提供高吞吐量的能力同时(每秒几百万消息),也提供了有状态的流式处理和低延迟分布式查询的能力。它类似Pig这种高级批处理工具,Trident提供了joins、aggregations、grouping、functions以及filters等功能函数。

    Trident主要提供了以下功能:

    • 常见的流分析操作,比如join、aggregation等。具体就是Trident提供的API操作。
    • 一次性处理语义(exactly-once)。
    • 事务数据存储(transaction)。

    Trident核心数据模型是一系列批处理(Batch)的流,也就是说虽然Storm Trident处理的是Stream,但是处理过程中Trident将Stream分隔成Batch来进行处理。

    Batch

    所以Stream会被切分成一个个Batch分布到集群中,所有应用在Stream上的函数都会具体应用到每个节点的Batch上中,来实现并行计算。

    为什么使用Trident

    Storm Topology适合一些无统计、不需要Transaction(事务)的应用,比如过滤、清洗数据等场景。Topology在开启Ack的情况下,能够保证数据不丢失但可能重复。
    而Trident适合需要严格不丢不重复消息的场景,比如交易额统计。Trident通过事务来实现eactly-once,保证数据不丢不重复。但同时,使用Trident会使其性能有所下降。

    Triden API

    Trident API可以分为Spout操作和Bolt操作,对于Bolt操作提供常见的流数据分析操作。
    Bolt Trident提供了五种类型的操作:

    • 本地分区操作(Partition-local operations),操作应用到本地每个分区上,这部分操作不会产生网络传输。
    • 重分区操作,对数据流进行重新分区,但是不会改变数据内容,这部分操作会有网络传输。
    • 聚合操作,这部分操作会有网络传输。
    • 流分组操作。
    • 合并(meger)和连接(join)操作。

    Trident Spout

    Trident与Storm topology一样也是使用Spout作为Trident拓扑的数据源。Trident Spout提供了更复杂的API,因为它既可以获取事务数据源,也可以获取非事务数据源。

    对于非事务的Spout,可以使用普通的Storm IRichSpout接口:

    TridentTopology topology = new TridentTopology();
    topology.newStream("myspoutid",new RichSpout());
    

    Trident拓扑中,所有Spout都需要指定一个唯一的流的标识,比如这里的“myspoutid”(整个集群级别的唯一标识)。Trident使用该唯一标识存储Spout元数据,比如txId(事务ID)以及其它Spout相关的信息。
    可以通过如下配置,来配置Zookeeper保存Spout的元数据。

    transactional.zookeeper.servers:Zookeeper主机列表
    transactional.zookeeper.port:Zookeeper集群端口
    transactional.zookeeper.root:在Zookeeper存储元数据的根目录
    

    下面是Trident Spout一些类型:

    • ITridentSpout:最通用API接口,可以支持事务和不透明事务语义。一般会用这个API分区的特性,而不是直接使用该接口。
    • IBatchSpout:非事务Spout,每次发射一个Batch的元组。
    • IPartitionedTridentSpout:事务Spout,从分区数据源读取数据,比如Kafka集群。
    • IOpaquePartitionedTridentSpout:不透明事务Spout,从分区数据源中读取数据。

    本地分区操作

    本地分区操作不会产生网络传输,并且会独立的应用到batch的每个分区上。

    函数操作(Functions)

    函数用于接受一个tuple,并且指定接收这个tuple的哪些field,它会发射(emit)0个或多个tuple。输出的tuple feild会被追加到原始tuple的后面,如果不输出tuple就意味着这个tuple被过滤掉了。比如下面的实例:

    class MyFunction extends BaseFunction {
            /**
             * 在每个元组上面执行该逻辑函数,并且发射0个或多个元组
             *
             * @param tuple     传入的元组
             * @param collector 用于发射元组的收集器实例
             */
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                //tuple.getInteger(0)接收第一个Field
                for(int i=0; i< tuple.getInteger(0); i++) {
                    collector.emit(new Values(i));
                }
            }
        }
    

    假设我们有一个“mystream”流,其中每个tuple包含以下filed ['a','b','c'],比如有以下元组。

    [1,2,3]
    [2,1,2]
    [10,0,1]
    

    我们将每个元组都经过以下MyFunction操作:

    //将每个tuple的字段"b"应用于MyFunction,并且产生新字段"d"追加到原tuple的字段中
    mystream.each(new Fields("b"),new MyFunction(),new Fields('d'))
    

    得到数据为:

    //[1,2,3]的emit,其中0和1是新字段“d”
    [1,2,3,0]
    [1,2,3,1]
    //[2,1,2]的emit,其中0是新字段“d”
    [2,1,2,0]
    //[10,0,1]不满足需求,过滤掉了
    

    过滤操作(Filters)

    接收一个tuple,并决定这个tuple是否应该被保留。比如我们有以下Filter操作:

    class MyFilter extends BaseFilter {
            /**
             * 确定是否应该从流中过滤元组
             *
             * @param tuple 被评估的元组
             * @return 返回"false"则该元组被抛弃,返回"true"则该元组被保留
             */
            @Override
            public boolean isKeep(TridentTuple tuple) {
                //每个tuple中的第一个Field
                return tuple.getInteger(0) > 1;
            }
        }
    

    同样以Function中的实例进行操作:

    mystream.filter(new MyFilter())。
    

    输出数据:

    #[1,2,3]中的第一个字段不大于1,所以被过滤掉
    [2,1,2]
    [10,0,1]
    

    map和flatMap操作

    map接收一个tuple,将其作用在map函数上,并且返回经map函数处理过的tuple字段值。
    比如下面的实例:

    class UpperMap implements MapFunction {
            /**
             * 流中的每个trident元组调用
             *
             * @param input 接受trident tuple
             * @return 返回转换之后的值
             */
            @Override
            public Values execute(TridentTuple input) {
                //只返回原tuple中第一个Filed的大写字符串(其它filed被丢弃了)
                return new Values(input.getString(0).toUpperCase());
            }
        }
    

    flatMap类似map,但是它会分两步执行:执行flat将所有元素展开,然后每个元素使用map函数。比如[[1,2],3,4]经过flat操作后得到元素集合为[1,2,3,4]。比如我们有以下实例:

    class SplitFlatMap implements FlatMapFunction {
            /**
             * 流中的每个trident元组调用
             *
             * @param input 接收的trident tuple
             * @return 一个可迭代的结果集
             */
            @Override
            public Iterable<Values> execute(TridentTuple input) {
                List<Values> resultValues = new ArrayList<>();
                //获取一个Filed并将其以空格作为切割
                for(String word : input.getString(0).split(" ")){
                    resultValues.add(new Values(word));
                }
                return resultValues;
            }
        }
    

    我们通过上面的flatMap和Map就可以得到一个流的所有大写词组流了。

    mystream.flatMap(new SplitFlatMap()).map(new UpperMap())。
    

    通常我们也可以将map或flatMap的输出结果命名一个新字段:

    mystream.flatMap(new SplitFlatMap(),new Fields("word"))
    

    peek操作

    peek操作一般用来debug,比如查看上一步的操作结果。假如我们有以下peek操作。

    class PrintPeek implements Consumer {
            /**
             * 对于输入的每个trident元组应用以下操作
             *
             * @param input 接收的trident 元组
             */
            @Override
            public void accept(TridentTuple input) {
                System.out.println(input.getString(0));
            }
        }
    

    以下处理操作,能把转换大写之后的tuple打印打出来:

    mystream.flatMap(new SplitFlatMap()).map(new UpperMap()).peek(new PrintPeek());
    

    min和minBy操作

    返回一批(Batch)元组中的每个分区的最小值。
    比如一批(Batch)元组有以下三个partition,它们对应的Field为['device-id','count']。

    Partiton 0:
    [213,15]
    [125,21]
    [100,10]
    
    Partition 1:
    [123,20]
    [215,32]
    [183,25]
    

    针对以上数据统计count最小的device-id:

    mystream.minBy(new Fields("count"));
    

    返回结果:

    Partition 0:
    [100,10]
    
    Partition 1:
    [123,20]
    

    除了以上使用方式,我们还可以通过传入比较器来使用min和minBy:

    public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) 
    public Stream min(Comparator<TridentTuple> comparator)
    
    max和maxBy操作

    max和maxBy操作同min/minBy操作,只不过返回最大值。

    mystream.maxBy(new Fields("count"));
    

    上面实例输出结果为:

    Partition 0:
    [125,21]
    Partition 1:
    [215,32]
    

    max和maxBy也提供了自定义比较器的方法:

    public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) 
    public Stream max(Comparator<TridentTuple> comparator) 
    

    窗口操作(Window)

    Trident流能够处理具有相同窗口的元素,对它们进行聚合操作,然后将聚合结果向下发送。Storm支持两种窗口操作:翻滚窗口(Tumbing window)和滑动窗口(Sliding window)。

    Tumbing window

    元组根据处理时间或计数分组到一个窗口中,任何元组只属于其中一个窗口。

    //返回一个元组流的聚合结果,它是滚动窗口内每windowCount个数的聚合结果
    public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
    //返回一个元组流的聚合结果,这些元组是一个窗口的聚合结果,该窗口在windowDuration的持续时间内滚动
    public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
    
    Sliding window

    元组在每个滑动间隔的窗口内分组,一个元组可能属于多个窗口。

    //返回一个元组流的聚合结果,它是滑动窗口每windowCount个元组树的聚合结果,并在slideCount之后滑动窗口
    public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
    //返回一个元组流的聚合结果,该窗口在slidingInterval持续滑动,并在windowDuration处完成一个窗口
    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
    
    Common window

    除了上面提供的滚动窗口api和滑动窗口api,Trident还提供了公用窗口api,通过windowConfig可以支持任意窗口。

    public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields)
    

    Trident window api需要使用WindowsStoreFactor存储接收到的元组和聚合值。目前,Trident提供了HBaseWindowsStoreFactor的HBase实现。

    partitionAggregate操作

    partitionAggregate对一批(batch)元组的每个分区进行聚合,与前面Function在元组后面追加不同Field不同,partitionAggregate会使用发射出去的元组替换接收进来的元组。比如以下实例:
    比如有以下数据,对应的Field分别为["a","b"]:

    Partition 0:
    ["a":1]
    ["b":2]
    
    Partition 1:
    ["c":2]
    ["d":2]
    

    使用partitionAggregate进行求和:

    mystream.partitionAggregate(new Fields("b"),new Sum(),new Fields("sum"))
    

    经过partitionAggregate函数之后的结果为:

    Partition 0:
    ["sum":3]
    
    Partition 1:
    ["sum":4]
    

    Trident API提供了三个聚合器接口:CombinerAggregator、ReducerAggregator和Aggregator。

    CombinerAggregator操作

    CombinerAggregator只返回单个tuple,并且这个tuple只包含一个Field。每个元组首先都经过init函数进行预处理,然后在执行combine函数来计算接受到的tuple,直到最后一个tuple到达。如果分区内没有tuple,则会通过zero函数发射结果。

    public interface CombinerAggregator<T> extends Serializable {
        T init(TridentTuple tuple);
        T combine(T val1, T val2);
        T zero();
    }
    

    比如以下实例:

    class Count implements CombinerAggregator<Long> {
            @Override
            public Long init(TridentTuple tuple) {
                //计数,每个tuple代表一个数
                return 1L;
            }
            @Override
            public Long combine(Long val1, Long val2) {
                return val1 + val2;
            }
            @Override
            public Long zero() {
                return 0L;
            }
        }
    
    ReducerAggregator操作

    ReducerAggregator通过init方法提供一个初始值,然后每个输入的tuple迭代这个值,最后产生一个唯一的tuple输出。

    public interface ReducerAggregator<T> extends Serializable {
        T init();
        T reduce(T curr, TridentTuple tuple);
    }
    

    比如同样使用RecuerAggregator来实现计数器:

     class  Count implements ReducerAggregator<Long> {
            @Override
            public Long init() {
                return 0L;
            }
    
            @Override
            public Long reduce(Long curr, TridentTuple tuple) {
                return curr + 1;
            }
        }
    
    Aggregator

    执行聚合操作最通用的接口就是Aggregator了,它能够发射任意数量的元组,每个元组可以包含任意数量的字段。

    public interface Aggregator<T> extends Operation {
        T init(Object batchId, TridentCollector collector);
        void aggregate(T state, TridentTuple tuple, TridentCollector collector);
        void complete(T state, TridentCollector collector);
    }
    

    它的执行流程是:

    1. 在处理Batch之前调用init方法,它返回一个聚合的状态值,传递给aggregate和complete方法。
    2. 为批处理分区中的每个tuple调用aggregate方法,此方法可以更新状态值,也可以发射元组。
    3. 当aggregator处理完Batch分区的所有元组后调用complete方法。

    使用Aggregator来实现计数器:

    class CountAgg extends BaseAggregator<CountAgg.CountState> {
            class CountState{
                long count = 0;
            }
    
            @Override
            public CountState init(Object batchId, TridentCollector collector) {
                return new CountState();
            }
    
            @Override
            public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
                state.count += 1;
            }
    
            @Override
            public void complete(CountState state, TridentCollector collector) {
                collector.emit(new Values(state.count));
            }
        }
    

    状态查询(stateQuery)和分区持久化(partitionPersist)

    stateQuery用于查询状态源,partitionPersist用于更新状态源。具体使用方式可查看:http://storm.apache.org/releases/1.2.2/Trident-state.html

    投影(projection)操作

    projection操作用于只保留指定的字段,比如元组有字段["a","b","c","d"],通过以下投影操作,输出流只会包含["c","d"]。

    mystream.projection(new Fields("c","d"));
    

    重分区操作

    Repartition操作运行一个函数来改变元组在任务之间的分布,调整分区数也可能会导致Repartition操作。重分区操作会引发网络传输。下面是重分区的相关函数:

    • shuffle:使用随机算法来均衡tuple到每个分区。
    • broadcast:每个tuple被广播到所有分区上,使用DRPC时使用这种方法比较多,比如每个分区上做stateQuery。
    • global:所有tuple都发送到一个分区上,这个分区用来处理stream。
    • batchGlobal:一个batch中的所有tuple会发送到一个分区中,不同batch的元组会被发送到不同分区上。
    • partition:通过一个自定义的分区函数来进行分区,这个自定义函数需要实现org.apache.storm.grouping.CustomStreamGrouping

    聚合操作

    Trident提供了aggregate和persistentAggregate方法,aggregate运行在每个batch中,而persistentAggregate将聚合所有Batch,并将结果保存在一个状态源上。
    我们前面讲的aggregate、CombinerAggregator和ReducerAggregator运行在patitionAggregation上是本地分区操作。如果直接作用于流上,则是对全局进行聚合。
    在对全局流进行聚合时,Aggregator和ReducerAggregator会首先重分区到一个单分区,然后在该分区上执行聚合函数。而CombinerAggregator则会首先聚合每个分区,然后重分区到单个分区,在网络传输中完成聚合操作。所以我们应该尽量用CombinerAggregator,因为它更加高效。

    mystream.aggregate(new Count(),new Fields("count"));
    

    流分组操作

    groupBy操作会重新分区流,对指定字段执行partitionBy操作,指定字段相同的元组被划分到相同的分区。goupBy操作如下图:

    Group

    如果在流分组中运行聚合器,聚合会在每个group中运行,而不是对整个Batch操作。

    合并和连接

    Trident可以允许我们将不同流组合在一起,通过TridentTopology.merge()方法操作。

    //合并流会以第一个流的输出字段来命名
    topology.mege(stream1,stream2,stream3);
    

    另一种合并流的方式是连接,类似于SQL那样的连接,要求输入是有限的。所以Trident的join只适用于来自Spout的每个小Bath之间。
    比如有一个流包含["key1","val1","val2"],另一个流包含["key2","val1","val2"],通过以下连接操作:

    //Trident需要join之后的流重新命名,因为输入流可能存在重复 字段。
    mystream.join(stream1,new Fields("key1"),stream2,new Fields("key2"),new Fields("key","a","b","c","d"))
    

    关注我

    欢迎关注我的公众号,会定期推送优质技术文章,让我们一起进步、一起成长!
    公众号搜索:data_tc
    或直接扫码:🔽


    欢迎关注我

    相关文章

      网友评论

        本文标题:Storm Trident 之 Trident API介绍

        本文链接:https://www.haomeiwen.com/subject/pzkquqtx.html