git地址:https://gitee.com/wangxinqiao/droolDemo.git
单词统计是storm经典案例
设计一个topology,来实现对文档里面的单词出现的频率进行统计。
整个topology分为三个部分:
- RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
- SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
- WordCountBolt:负责对单词的频率进行累加
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
<!--本地调试需要注销,在服务器上启动需要放开-->
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
MySpout
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class MySpout extends BaseRichSpout {
SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
collector.emit(new Values("i am lilei love hanmeimei"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("love"));
}
}
MySplitBolt
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class MySplitBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String line = input.getString(0);
String[] arrWords = line.split(" ");
for (String word:arrWords){
collector.emit(new Values(word,1));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
MyCountBolt
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class MyCountBolt extends BaseRichBolt {
OutputCollector collector;
Map<String, Integer> map = new HashMap<String, Integer>();
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String word = input.getString(0);
Integer num = input.getInteger(1);
System.out.println(Thread.currentThread().getId() + " word:"+word);
if (map.containsKey(word)){
Integer count = map.get(word);
map.put(word,count + num);
}else {
map.put(word,num);
}
System.out.println("count:"+map);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
WordCountTopologMain
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopologMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout",new MySpout(),2);
topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");
topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));
// topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).shuffleGrouping("mybolt1");
// config.setNumWorkers(2);
/**
* i
* am
* lilei
* love
* hanmeimei
*/
Config config = new Config();
config.setNumWorkers(2);
// StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
LocalCluster localCluster = new LocalCluster();
//本地调试使用
localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
}
}
执行main结果
count:{love=268027}
95 word:love
count:{love=268028}
95 word:love
count:{love=268029}
count:{am=265528}
97 word:am
网友评论