jstorm是阿里巴巴使用java语言重写的storm,可以用来做流式计算,我们使用jstorm从kafka中读取数据,然后汇总单词数量
1.wordcount编写
jstorm的官网较为详细的介绍了jstorm的各个组件,与Kafka集成时,只需要替换spout即可。
处理流程在编写代码时,我们首先要引入jstorm的jar包和storm的kafka包
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>${jstorm.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
1.1 Spout
Kafka的Spout需要指定Kafka的zookeeper地址和broker路径,topic的名称,将这些信息放在SpoutConfig中,然后创建KafkaSpout即可。
private static KafkaSpout kafkaSpout(){
String brokerZkStr = "106.12.196.74:2181";
String brokerZkPath = "/brokers";
String topic = "jianshu-topic-new";
String zkRoot = "";
//id 可以随意命名
String id = "jstormspout";
ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
SpoutConfig spoutConf = new SpoutConfig(zk, topic, zkRoot, id);
List<String> zkServices = new ArrayList<String>();
for(String str : zk.brokerZkStr.split(",")){
zkServices.add(str.split(":")[0]);
}
spoutConf.zkServers = zkServices;
spoutConf.zkPort = 2181;
spoutConf.forceFromStart = true;
spoutConf.socketTimeoutMs = 60 * 1000;
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConf);
}
1.2 拆分Bolt
拆分的逻辑比较简单,根据空格拆分即可,然后分别发送每个word
public class SplitBolt extends BaseRichBolt {
OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getString(0); // getStringByField("sentence");
String rs[] = sentence.split("\\s+");
for (String sen : rs){
outputCollector.emit(new Values(sen));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
1.3 统计Bolt
使用一个map统计每个单词出现的次数
public class TotalCount extends BaseRichBolt {
OutputCollector outputCollector;
Map<String,Integer> map ;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
this.map = new ConcurrentHashMap<String, Integer>() {
};
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Object obj = map.get(word);
int size = 0;
if(obj != null){
size = Integer.valueOf(obj.toString());
}
map.put(word , size+1);
System.out.println(String.format("[%s] is %d",word,size+1));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("count"));
}
}
1.4 构建拓扑
// spout
SpoutDeclarer spout = builder.setSpout(KafkaTopology.SEQUENCE_SPOUT_NAME,
kafkaSpout(), spoutParal);
// bolt,使用shuffleGrouping
builder.setBolt(SPLIT_BOLT_NAME,new SplitBolt()).shuffleGrouping(SEQUENCE_SPOUT_NAME);
// bolt,使用fieldsGrouping,相同的word会被发送到同一个bolt
BoltDeclarer totalBolt = builder.setBolt(KafkaTopology.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).fieldsGrouping(KafkaTopology.SPLIT_BOLT_NAME , new Fields("word"));
网友评论