美文网首页
storm求和案例

storm求和案例

作者: piziyang12138 | 来源:发表于2018-10-04 16:30 被阅读0次
    /**
     * 实现简单的本地求和功能
     */
    public class LocalSumTopology {
    
        /**
         * Spout组件
         * 产生数据并且发送
         */
        public static class SumSpout extends BaseRichSpout{
    
            private SpoutOutputCollector collector;
    
            /**
             * 初始化操作
             * @param conf 初始化配置项
             * @param context 上下文
             * @param collector 数据发射器
             */
            @Override
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.collector=collector;
            }
    
            int number=0;
            /**
             * 发射数据
             * 该方法是一个死循环
             */
            @Override
            public void nextTuple() {
                //Values类实现了ArrayList
                collector.emit(new Values(++number));
    
                System.out.println("Spout number: "+number);
                //防止数据产生太快,睡眠一秒
                Utils.sleep(1000);
    
            }
    
            /**
             * 定义输出端字段
             * @param declarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                //与上面的number变量对应
                declarer.declare(new Fields("num"));
            }
        }
    
        /**
         * Bolt组件
         * 实现业务的逻辑处理,这里求和
         */
        public static class SumBolt extends BaseRichBolt{
            /**
             * 因为 这里接收数据之后不需要再发送给下一个Bolt,因此再初始化collector发射器
             * @param stormConf
             * @param context
             * @param collector
             */
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            int sum=0;
            /**
             * 执行业务逻辑的处理
             * 该方法也是一个死循环
             * @param input
             */
            @Override
            public void execute(Tuple input) {
                //可以通过字段名或者下标索引获取
                Integer value=input.getIntegerByField("num");
                sum+=value;
                System.out.println("Bolt sum: "+sum);
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
        public static void main(String[] args) {
            //使用TopologyBuilder设置Spout和Bolt,并且将其关联 在一起
            //创建Topology
            TopologyBuilder builder=new TopologyBuilder();
            builder.setSpout("SumSpout",new SumSpout());
            builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("SumSpout");
            //使用本地模式
            LocalCluster localCluster=new LocalCluster();
            localCluster.submitTopology("LocalSumTopology",new Config(),builder.createTopology());
        }
    
    }
    
    

    提交到集群运行

     public static void main (String[] args){
    
            //TopologyBuilder根据spout和bolt来构建Topology
            //storm中任何一个作业都是通过Topology方式进行提交的
            //Topology中需要指定spout和bolt的执行顺序
            TopologyBuilder tb = new TopologyBuilder();
            tb.setSpout("SumSpout", new SumSpout());
            //SumBolt以随机分组的方式从DataSourceSpout中接收数据
            tb.setBolt("SumBolt", new SumBolt()).shuffleGrouping("SumSpout");
    
            //代码提交到storm集群上运行
            try {
                StormSubmitter.submitTopology("ClusterSumStormTopology", new Config(), tb.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
    
        }
    
    

    运行命令

    ./storm jar /root/untitled5.jar com.neusoft.LocalSumTopology
    
    

    查看输出

    image.png

    相关文章

      网友评论

          本文标题:storm求和案例

          本文链接:https://www.haomeiwen.com/subject/ilbxaftx.html