美文网首页
Strom之WordCount

Strom之WordCount

作者: 阿坤的博客 | 来源:发表于2018-04-04 15:51 被阅读78次

    Storm用来实时处理数据,特点:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。本文讲述在Strom平台开发一个WordCount的步骤

    主要内容:

    • 1.引入依赖
    • 2.编写代码
    • 3.提交测试

    1.引入依赖

    <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.2.1</version>
    </dependency>
    

    2.编写代码

    2.1.ReadFileSpolt

    每隔500ms产生一条"i love you"发送给下一个bolt,其中指定键为"biaobai"

    public class ReadFileSpout extends BaseRichSpout {
        private SpoutOutputCollector collector;
    
        /**
         * conf 应用程序能够读取的配置文件
         * context 应用程序的上下文
         * collector Spout输出的数据丢给SpoutOutputCollector
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            //1、Kafka 连接  / MYSQL 连接  /Redis 连接
            //TODO
            //2、将SpoutOutputCollector复制给成员变量
            this.collector = collector;
        }
    
        /**
         * storm框架有个while循环,一直在nextTuple
         */
        @Override
        public void nextTuple() {
            // 发送数据,使用collector.emit方法
            collector.emit(new Values("i love u"));
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("biaobai"));
        }
    }
    

    2.2.SentenceSplitBolt.java

    接收键为"biaobai"的数据并切分单词,统计每个单词的数量发送给下一个bolt,其中键为"word"、"num"

    public class SentenceSplitBolt extends BaseRichBolt {
        private OutputCollector collector;
    
        /**
         * 初始化方法
         * Map stormConf 应用能够得到的配置文件
         * TopologyContext context 上下文 一般没有什么用
         * OutputCollector collector 数据收集器
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            //todo 连接数据 连接redis 连接hdfs
        }
    
        /**
         * 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
         */
        @Override
        public void execute(Tuple input) {
            // String sentence = input.getString(0);
            // 底层先通过 biaobai 这个字段在map中找到对应的index角标值,然后再valus中获取对应数据。
            String sentence = input.getStringByField("biaobai");
            // TODO 切割
            String[] strings = sentence.split(" ");
            for (String word : strings) {
                // TODO 输出数据
                collector.emit(new Values(word, 1));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 声明 输出的是什么字段
            declarer.declare(new Fields("word", "num"));
        }
    }
    

    2.3.WordCountBolt

    利用HashMap统计缓存单词的总数量

    public class WordCountBolt extends BaseRichBolt {
        private OutputCollector collector;
        private Map<String, Integer> wordCountMap;
    
        /**
         * 初始化方法
         * Map stormConf 应用能够得到的配置文件
         * TopologyContext context 上下文 一般没有什么用
         * OutputCollector collector 数据收集器
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            //todo 连接数据 连接redis 连接hdfs
            wordCountMap = new HashMap<String, Integer>();
        }
    
        /**
         * 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
         */
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Integer num = input.getIntegerByField("num");
            // 先判断这个单词是否出现过
            if (wordCountMap.containsKey(word)) {
                Integer oldNum = wordCountMap.get(word);
                wordCountMap.put(word, oldNum + num);
            } else {
                wordCountMap.put(word, num);
            }
            System.out.println("=======================");
            System.out.println(wordCountMap);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 声明 输出的是什么字段
            declarer.declare(new Fields("fenshou"));
        }
    }
    

    2.4.WordCountTopology

    将Spolt、Bolt组装成Topology

    public class WordCountTopology {
        public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            //1、创建一个job(topology)
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //2、设置job的详细内容
            topologyBuilder.setSpout("ReadFileSpout",new ReadFileSpout(),1);
            topologyBuilder.setBolt("SentenceSplitBolt",new SentenceSplitBolt(),1).shuffleGrouping("ReadFileSpout");
            topologyBuilder.setBolt("WordCountBolt",new WordCountBolt(),1).shuffleGrouping("SentenceSplitBolt");
            //准备配置项
            Config config = new Config();
            config.setDebug(false);
            //3、提交job
            //提交由两种方式:一种本地运行模式、一种集群运行模式。
            if (args != null && args.length > 0) {
                //运行集群模式
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology());
            }
        }
    }
    

    在本地测试直接run这个Topology就可以看到结果了


    本地结果

    3.提交到集群测试

    storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar me.jinkun.storm.wc.WordCountTopology wordcount
    
    集群中的wordcount

    参考

    https://www.jianshu.com/p/f645eb7944b0

    相关文章

      网友评论

          本文标题:Strom之WordCount

          本文链接:https://www.haomeiwen.com/subject/gxjicftx.html