美文网首页初见
风控系统五-storm单词统计案例

风控系统五-storm单词统计案例

作者: 源码互助空间站 | 来源:发表于2020-05-14 14:40 被阅读0次

    git地址:https://gitee.com/wangxinqiao/droolDemo.git
    单词统计是storm经典案例
    设计一个topology,来实现对文档里面的单词出现的频率进行统计。
    整个topology分为三个部分:

    • RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
    • SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
    • WordCountBolt:负责对单词的频率进行累加

    pom.xml

    <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.5</version>
                <!--本地调试需要注销,在服务器上启动需要放开-->
                <!--<scope>provided</scope>-->
            </dependency>
        </dependencies>
    

    MySpout

    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    import java.util.Map;
    
    public class MySpout extends BaseRichSpout {
        SpoutOutputCollector collector;
    
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
    
        public void nextTuple() {
            collector.emit(new Values("i am lilei love hanmeimei"));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
           declarer.declare(new Fields("love"));
        }
    }
    

    MySplitBolt

    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    import java.util.Map;
    
    public class MySplitBolt extends BaseRichBolt {
        OutputCollector collector;
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        public void execute(Tuple input) {
            String line = input.getString(0);
            String[] arrWords = line.split(" ");
            for (String word:arrWords){
                collector.emit(new Values(word,1));
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","num"));
        }
    }
    

    MyCountBolt

    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class MyCountBolt extends BaseRichBolt {
        OutputCollector collector;
        Map<String, Integer> map = new HashMap<String, Integer>();
    
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        public void execute(Tuple input) {
            String word = input.getString(0);
            Integer num = input.getInteger(1);
            System.out.println(Thread.currentThread().getId() + "    word:"+word);
            if (map.containsKey(word)){
                Integer count = map.get(word);
                map.put(word,count + num);
            }else {
                map.put(word,num);
            }
            System.out.println("count:"+map);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }
    

    WordCountTopologMain

    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class WordCountTopologMain {
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("mySpout",new MySpout(),2);
            topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");
            topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));
    //        topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).shuffleGrouping("mybolt1");
            //  config.setNumWorkers(2);
            /**
             * i
             * am
             * lilei
             * love
             * hanmeimei
             */
    
    
            Config config =  new Config();
            config.setNumWorkers(2);
    
    //        StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
            LocalCluster localCluster = new LocalCluster();
            //本地调试使用
    localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
        }
    }
    

    执行main结果

    count:{love=268027}
    95    word:love
    count:{love=268028}
    95    word:love
    count:{love=268029}
    count:{am=265528}
    97    word:am
    

    相关文章

      网友评论

        本文标题:风控系统五-storm单词统计案例

        本文链接:https://www.haomeiwen.com/subject/eaetohtx.html