美文网首页
使用Storm实现累加求和操作

使用Storm实现累加求和操作

作者: 留歌_36 | 来源:发表于2019-01-13 11:24 被阅读0次
package com.csylh;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * Description:使用Storm实现累加求和操作
 *
 * @author: 留歌36
 * Date:2018/9/3 16:50
 */
public class LocalSumStormTopology {
    /**
     * Spout需要继承BaseRichSpout
     * 数据源需要产生数据并发射到Bolt
     */
    public static class DataSourceSpout extends BaseRichSpout{
        //定义一个发射器
        private SpoutOutputCollector collector;

        /**
         * 初始化方法 只是会被调用一次
         * @param conf     配置参数
         * @param context 上下文:相当于一个框 可以从里面获取许多东西
         * @param collector 数据发射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            //将传入的collector发射器 对私有变量 进行赋初值
            this.collector = collector;
        }
        int number = 0;
        /**
         *  用于产生数据
         *  生产中肯定是从消息队列中获取数据
         *  这个方法是一个死循环
         */
        @Override
        public void nextTuple() {
            //发送方式,调用上面定义的数据发射器
            this.collector.emit(new Values(number++));

            System.out.println("Spout==》发送的数据:" + number);
            //每隔1s中发射一次,防止数据产生太快
            Utils.sleep(1000);

        }
        /**
         * 声明输出字段
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }
    }

    /**
     * Bolt需要继承BaseRichBolt
     * 用于接收数据并对数据进行处理
     */
    public static class SumBolt extends BaseRichBolt{
        /**
         *  初始化方法 ,会被执行一次
         * @param stormConf
         * @param context
         * @param collector  这里的数据发射器,由于业务逻辑中没有没有必要进行放下发的操作,所以就是没有必要进行new一个
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }
        int sum = 0;
        /**
         * 也是一个死循环 ,职责: 获取Spout发射过来的数据
         * @param input
         */
        @Override
        public void execute(Tuple input) {

           //Bolt中获取值可以通过index获取
            // 也可以根据上一个环节中定义的filed的名称获取(***推荐)
          Integer value = input.getIntegerByField("num");
          sum += value;

          System.out.println("Bolt :Sum = ["+ sum + "]");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");

        //创建一个本地的Storm集群 ,本地模式运行,不需要搭建Storm集群
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());

    }

}

相关文章

  • 使用Storm实现累加求和操作

  • Storm基础开发案例-数字累加

    需求 使用Storm实现累加求和的操作 开发过程 1. Spout 定义 首先 Spout需要继承BaseRich...

  • 11.10

    累加求和 累加求和(二) 累加求和(三) 输入求和

  • 作业17-11-10

    累加求和 累加求和(二) 累加求和(三) 输入求和 看结果写程序

  • 17-11-10

    累加求和 累加求和(二) 累加求和(三) 输入求和 看结果写程序

  • 11·10

    作业一:累加求和 作业二:累加求和(二) 作业三:累加求和(三)

  • C# Reflection and Linq

    Linq Aggregate 聚合 字符串替换 累加求和 1 - 100 Tips: 对于累加可以使用等差数列求和...

  • 17-11-10

    作业一:累加求和 作业二:累加求和(二) 作业三:累加求和(三) 作业四:输入求和

  • 11.10

    题目一:累加求和 题目二:累加求和(二)

  • 17-11-10

    题目一:累加求和 题目二:累加求和(2) 题目三:累加求和(3) 题目四:输入求和 题目五:看结果写程序

网友评论

      本文标题:使用Storm实现累加求和操作

      本文链接:https://www.haomeiwen.com/subject/mkmkdqtx.html