storm自定义实现wordcount

作者: 心_的方向 | 来源:发表于2016-12-14 17:35 被阅读598次

    storm中的任务

    1. storm中的任务的结构是Topology(拓扑图),这个拓扑图是一个有向无环图(DAG),DAG能够清楚的表达链式的任务,每一个节点都是一个任务,边的方向代表着数据流的方向。如下图


      Paste_Image.png
    2. storm任务中数据流的数据结构是一个个tuple,tuple元组是任意数据结构类型的键值对组合。例如:(k1:v1, k2:v2, k3:v3, ····)
    3. Spout是数据采集器,从数据源采集数据,转成tuple发射到后面的bolt处理
    4. Bolt是数据处理器,可执行数据过滤,分析等操作。

    开发流程

    1. 设计Topology图


      Paste_Image.png
    2. 按照Topology图,创建maven项目后,依次写各个任务节点。首先写SentenceSpout节点。
    package strom.strom;
    import java.util.Map;
    import java.util.Random;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    public class SentenceSpout extends BaseRichSpout {
    
        // tuple发射器
        private SpoutOutputCollector collector;
    
        private static final String[] SENTENCES = { "hadoop yarn mapreduce spark", "flume hadoop hive spark",
                "oozie yarn spark storm", "storm yarn mapreduce error", "error flume storm spark" };
    
        /*
         * 用于指定只针对本组件的一些特殊配置
         */
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        /*
         * spout组件的初始化方法 创建这个sentenceSpout组件实例时调用一次
         */
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
            // 用实例变量接收发射器
            this.collector = arg2;
        }
    
        /*
         * 声明向后面的组件发送tuple的key是什么
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            arg0.declare(new Fields("sentence"));
        }
    
        /*
         * 1)指定tuple的value值,封装tuple后,并将其发射给后面的组件, 2) 会迭代式的循环调用这个方法
         */
        @Override
        public void nextTuple() {
            // 从数组中随意获取一个值
            String sentence = SENTENCES[new Random().nextInt(SENTENCES.length)];
            // 指定value值并封装为tuple后,把tuple发射给后面的组件
            this.collector.emit(new Values(sentence));
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    1. 写splitbolt组件
    package strom.strom;
    import java.util.Map;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    
    public class SplitBolt implements IRichBolt {
    
        // bolt组件中的发射器
        private OutputCollector collector;
    
        @Override
        public void cleanup() {
    
        }
    
        /*
         * 设置key名称
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            arg0.declare(new Fields("word"));
        }
    
        /*
         * 每次接受到前面组件发送的tuple调用一次 ,封装好tuple后发射
         */
        @Override
        public void execute(Tuple input) {
            // 获取key value对后,取出value值
            String values = input.getStringByField("sentence");
            if (values != null && !"".equals(values)) {
                // 按空格分割value
                String[] valuelist = values.split(" ");
                for (String value : valuelist) {
                    // 向后面的组件发射封装好的tuple
                    this.collector.emit(new Values(value));
                }
            }
        }
    
        /*
         * bolt组件初始化方法,只会调用一次
         */
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
            this.collector = arg2;
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
    }
    

    4.CountBolt组件实现计数逻辑

    package strom.strom;
    //
    
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class CountBolt extends BaseRichBolt {
    
        // 发射器
        private OutputCollector collector;
        // 为了计数
        private Map<String, Integer> counts;
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
            this.collector = arg2;
            this.counts = new HashMap<String, Integer>();
        }
    
        /*
         * 声明key名称,可以同时声明多个
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
            arg0.declare(new Fields("word", "count"));
        }
    
        /*
         * 统计单词
         */
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
    
            int count = 1;
            // 如果这个单词已经存在,则取出count再加一
            if (counts.containsKey(word)) {
                count = counts.get(word) + 1;
            }
            counts.put(word, count);
            this.collector.emit(new Values(word, count));
        }
    }
    

    5 . PrintBolt组件

    package strom.strom;
    //
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class PrintBolt extends BaseRichBolt {
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
    
        }
    
        /*
         * 打印到控制台
         */
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            int count = input.getIntegerByField("count");
            System.out.println(word + "---->" + count);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer arg0) {
    
        }
    
    }
    

    6 . WordCountTopology类用来连接这些组件

    package strom.strom;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class WordCountTopology {
    
        private static final String SPOUT_ID = "sentenceSpout";
        private static final String SPLIT_BOLT = "splitBolt";
        private static final String COUNT_BOLT = "countBolt";
        private static final String PRINT_BOLT = "printBolt";
    
        public static void main(String[] args) {
            // 构造Topology
            TopologyBuilder builder = new TopologyBuilder();
            // 指定spout
            builder.setSpout(SPOUT_ID, new SentenceSpout());
            // 指定bolt,并指定当有有多个bolt时,数据流发射的分组策略
            builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(SPOUT_ID);
            // 因为要保证正确的单词计数,同一个单词一定要划分到同一个CountBolt上,所以按照字段值分组
            builder.setBolt(COUNT_BOLT, new CountBolt()).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
            // 全局分组,所有tuple发射到一个printbolt,一般是id最小的那一个
            builder.setBolt(PRINT_BOLT, new PrintBolt()).globalGrouping(COUNT_BOLT);
    
            Config conf = new Config();
    
            if (args == null || args.length == 0) {
                // 本地执行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("wordcount", conf, builder.createTopology());
            } else {
                // 提交到集群上执行
                // 指定使用多少个进程来执行该Topology
                conf.setNumWorkers(1);
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    1. 本地执行测试


      Paste_Image.png
    2. 打成jar包后上传到storm集群测试


      Paste_Image.png

      下面的jar包包含着依赖的包,上面的jar包中没有包括,所以我们选择使用下面这个jar包。
      上传到集群上然后执行

    $ bin/storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar strom.strom.WordCountTopology wordcount
    

    在UI中查看运行情况


    Paste_Image.png

    查看运行日志


    Paste_Image.png
    Paste_Image.png
    Paste_Image.png

    查看拓扑图


    Paste_Image.png
    Paste_Image.png
    Paste_Image.png

    相关文章

      网友评论

        本文标题:storm自定义实现wordcount

        本文链接:https://www.haomeiwen.com/subject/lakimttx.html