美文网首页
Storm从入门到精通8:Storm实现WordCount程序

Storm从入门到精通8:Storm实现WordCount程序

作者: 金字塔下的小蜗牛 | 来源:发表于2020-04-03 09:17 被阅读0次

    1.Storm流式数据处理流程

    Storm处理流式数据的一般架构如下图所示:

    image
    1. Flume用来获取数据。
    2. Kafka用来临时保存数据。
    3. Storm用来计算数据。
    4. Redis是个内存数据库,用来保存数据。

    2.Storm实现WordCount程序

    新建Java工程StormWordCount,将$STORM_HOME/lib下面的所有Jar包加到工程的buildpath中,环境就搭建好了。

    2.1创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

    package demo.wordcount;
    import java.util.Map;
    import java.util.Random;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    //采集数据spout组件
    public class WordCountSpout extends BaseRichSpout {
    // 模拟产生一些数据
    private String[] data = { "I love Beijing", "I love China", "Beijing is the capital of China" };
    // 定义spout的输出流
    private SpoutOutputCollector collector;
    @Override
    public void nextTuple() {
    // 每隔3秒钟采集一次数据
    Utils.sleep(3000);
    // 由storm框架调用,每次调用进行数据采集
    // 随机产生一个字符串
    int random = (new Random()).nextInt(3);
    // 打印
    System.out.println("采集的数据是:" + data[random]);
    // 将采集到的数据发送给下一个组件进行处理 
    this.collector.emit(new Values(data[random]));
    }
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
    // 初始化spout组件时调用
    this.collector = collector;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // 声明输出的Tuple的格式
    declarer.declare(new Fields("sentence"));
    
     }
    

    2.2创建Bolt(WordCountSplitBolt)组件进行分词操作

    package demo.wordcount;
    import java.util.Map;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    public class WordCountSplitBolt extends BaseRichBolt {
    private OutputCollector collector;
    @Override
    public void execute(Tuple tuple) {
    // 处理上一个组件发来的数据     
    String str = tuple.getStringByField("sentence");
    // 分词操作
    String[] words = str.split(" ");
    // 将处理好的(word,1)形式的数据发送给下一个组件
    for (String w : words) {
    this.collector.emit(new Values(w, 1));
    }
    }
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    // 初始化时调用
    // OutputCollector代表的就是这个bolt组件的输出流
    this.collector = collector;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // 声明这个Bolt组件输出Tuple的格式
    declarer.declare(new Fields("word", "count"));
        }
     }
    

    2.3创建Bolt(WordCountBoltCount)组件进行单词计数操作

    package demo.wordcount;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    //统计单词个数
    public class WordCountTotalBolt extends BaseRichBolt {
    private OutputCollector collector;
    // 定义一个集合来保存单词计数结果
    private Map<String, Integer> result = new HashMap<>();
    @Override
    public void execute(Tuple tuple) {
    // 取出Tuple中的数据
    String word = tuple.getStringByField("word");
    int count = tuple.getIntegerByField("count");
    // 统计单词出现的个数
    if (result.containsKey(word)) {
    int total = result.get(word);
    result.put(word, total + count);
    } else {
    result.put(word, count);
    }
    // 打印
    System.out.println("单词统计的结果:" + result);
    // 输出数据
    this.collector.emit(new Values(word, result.get(word)));
    }
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    // 初始化时调用
    this.collector = collector;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 声明输出数据的格式
    declarer.declare(new Fields("word", "total"));
        }
     }
    

    2.4创建主程序Topology(WordCountTopology)

    package demo.wordcount;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    public class WordCountTopology {
    public static void main(String[] args) throws Exception {
    //主程序:创建一个Topology的任务
    TopologyBuilder builder = new TopologyBuilder();
    //指定Topology任务的spout组件
    builder.setSpout("wordcount_spout", new WordCountSpout());
    //指定Topology任务的第一个Bolt组件:分词 
    builder.setBolt("wordcount_split_bolt", new WordCountSplitBolt()).shuffleGrouping("wordcount_spout");
    //指定Topology任务的第二个Bolt组件:计数
    builder.setBolt("wordcount_count_bolt", new WordCountTotalBolt()).fieldsGrouping("wordcount_split_bolt",new Fields("word"));
    //创建Topology任务
    StormTopology wc = builder.createTopology();
    //配置参数
    Config conf = new Config();
    //执行任务
            //方式1:本地模式
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("MyStormWordCount", conf, wc);
            //方式2:集群模式
            //StormSubmitter.submitTopology(args[0], conf, wc);
        }
     }
    

    3.执行Storm程序

    3.1本地模式执行

    在上面的主程序中,修改为以方式1:本地模式运行,结果如下:

    image

    3.2集群模式执行

    将上面的主程序中,修改为以方式2:集群模式运行,打包成StormWordCount.jar,上传到Storm集群运行,结果如下:

    [root@master input]# ls StormWordCount.jar
    StormWordCount.jar
    [root@master input]# storm jar StormWordCount.jar demo.wordcount.WordCountTopology MyStormWordCount
    ***log***
    641 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: MyStormWordCount

    image image image

    相关文章

      网友评论

          本文标题:Storm从入门到精通8:Storm实现WordCount程序

          本文链接:https://www.haomeiwen.com/subject/dhjkdhtx.html