美文网首页
storm Trident编程之Function

storm Trident编程之Function

作者: 先生_吕 | 来源:发表于2017-04-26 11:32 被阅读66次

    Trident编程实质与普通的storm源码API编程没有什么本质,其只是对topology出的逻辑执行代码进行了封装,使得编码处理更一体化,功能类要继承BaseFunction类,重写execute一个方法,不在想原始API编程那样重写很多方法,造成代码冗余。
    直接上代码

    /**
     * 
     * @author mis
     *
     * Trident的作用就是简化了bolt
     */
    public class TridentFunction {
        
        public static class SumFunction extends BaseFunction {
    
            public void execute(TridentTuple tuple, TridentCollector collector) {
                System.out.println("传入进来的内容为:" + tuple);
                //获取a、b两个域值
                int a = tuple.getInteger(0);//第0个元素
                int b = tuple.getInteger(1);//第1个元素
                int sum = a + b ; //sum计算前两个元素之和
                //发射数据
                collector.emit(new Values(sum));
            }   
        }
        
        public static class Result extends BaseFunction {
    
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                //获取tuple输入内容
                Integer a = tuple.getIntegerByField("a");
                Integer b = tuple.getIntegerByField("b");
                Integer c = tuple.getIntegerByField("c");
                Integer d = tuple.getIntegerByField("d");
                Integer sum = tuple.getIntegerByField("sum");
                System.out.println("a: " + a +", b: " + b + ", c: " + c + ", d: " +d + ", sum: " + sum);
            }   
        }
    
        public static StormTopology buildTopology() {
            TridentTopology topology = new TridentTopology();
            //设定数据源
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(
                    new Fields("a","b","c","d"),//声明输入的域字段为"a","b","c","d"
                    4,//设置批处理大小
                    
                    //设置数据源内容(此处测试数据)
                    new Values(1,4,7,10),
                    new Values(1,1,3,11),
                    new Values(2,2,7,1),
                    new Values(2,5,7,2));
            
            //指定是否循环
            spout.setCycle(false);
            //指定输入员
            Stream inputStream = topology.newStream("spout", spout);
            /**
             * 要实现流spout - bolt的模式 在trident里是使用each来做到的
             * each方法参数:
             *      1,输入数据源参数名称:"a","b","c","d"
             *      2,需要流转执行的function对象(也就是bolt对象):new SumFunction()
             *      3,指定function对象里的输出参数名称:sum
             * */
            inputStream.each(new Fields("a","b","c","d"), new SumFunction(),new Fields("sum"))
                    /**
                     * 继续使用each调用下一个function(bolt)
                     *      1,参数一为:"a","b","c","d","sum"
                     *      2,参数二为:new Result() 也就是执行函数,第三个参数为没有输出(即这是最后一个bolt)
                     * */
                    .each(new Fields("a","b","c","d","sum"),new Result(),new Fields());
            
            return topology.build();//利用这种方式,我们返回一个StormTopology对象,进行提交
        }
        
        
        
        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setNumWorkers(2);
            conf.setMaxSpoutPending(20);
            if(args.length == 0){//本地模式运行
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("trident-function", conf, buildTopology());
                Thread.sleep(10000);
                cluster.shutdown();
            }else{//集群模式运行
                StormSubmitter.submitTopology(args[0], conf, buildTopology());
            }
        }
    }
    
    

    使用each()方法进行链式编程,数据所流向的bolt类直接以对象的形式传入

    结果:

    2017-04-26_113201.png

    相关文章

      网友评论

          本文标题:storm Trident编程之Function

          本文链接:https://www.haomeiwen.com/subject/oorizttx.html