背景
前面一章节介绍了如何使用rocketmq来归集数据信息,下面将介绍如何使用storm从rocketmq中获取日志信息,并将消息转换为流信息进行统计分析。
storm简介
Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。
在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程worker。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程worker组成。
storm-run-map.png
Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。
Topology
计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图。下面是一个Topology的结构示意图:
storm-base.png
其中包含有:
Spout
Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。Spout可以是可靠的,也可以是不可靠的。如果这个tuple没有被Storm完全处理,可靠的消息源可以重新发射一个tuple,但是不可靠的消息源一旦发出一个tuple就不能重发了。(可靠性会在下面介绍)
Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回(如果已经没有新的tuple)。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
Bolt
Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤, 聚合, 查询数据库等操作,而且可以一级一级的进行处理。
storm安装及开发
将下载的包解压即可。
storm开发
开发环境搭建。
创建一个常规的java工程或maven工程就可以,并引入storm包的依赖。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
spout开发
由于我们的日志归集组件使用的RocketMq,所以我们需要使用RocketMq开发spout类,在storm运行环境中运行RocketMq的消费端,获取指定主题的消息队列数据。由于storm发行发布的稳定版默认没有集成RocketMq组件,所以需要将RocketMq集成到storm中,使用RocketMq定义的spout来获取rocketmq的中消息信息。
由于storm最新开发的主干上已经集成了RocketMq,但中因为与1.2.2版本的核心接口不一致。所以需要将工程源码下载下来进行修改一下。
修改步骤为:
1.导入\external\storm-rocketmq工程
2.修改工程的pom.xml文件,将storm-core的依赖版本修改为1.2.2
3.修复编译异常,主是是将接口中Map<String,Object>修改为Map。
4.为了能在Spout中输出的Tuple中获取RocketMq的tags和keys属性信息,将org.apache.storm.rocketmq.RocketMqUtils.java进行了重构。
/**
* Generate Storm tuple values by Message and Scheme.
* @param msg RocketMQ Message
* @param scheme Scheme for deserializing
* @return tuple values
*/
public static List<Object> generateTuples(Message msg, Scheme scheme) {
List<Object> tup;
String rawKey = msg.getKeys();
ByteBuffer body = ByteBuffer.wrap(msg.getBody());
if (rawKey != null && scheme instanceof KeyValueScheme) {
ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
//ADD BEGIN
} if (scheme instanceof MessageScheme) {
tup = ((MessageScheme)scheme).deserializeValue(msg);
//ADD END
} else {
tup = scheme.deserialize(body);
}
return tup;
}
并添加一个接口MessageScheme:
package org.apache.storm.rocketmq.spout.scheme;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.storm.spout.Scheme;
public interface MessageScheme extends Scheme {
List<Object> deserializeValue(Message msg);
}
和一个实现类 DefaultMessageScheme:
package org.apache.storm.rocketmq.spout.scheme;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class DefaultMessageScheme extends StringScheme implements MessageScheme {
public static final String FIELD_TAGS = "Tags";
public static final String FIELD_TEYS = "Keys";
public static final String FIELD_BODY = "Body";
public static final Fields DEFAULT_FIELDS = new Fields("Tags", "Keys", "Body");
@Override
public List<Object> deserializeValue(Message msg) {
ByteBuffer body = ByteBuffer.wrap(msg.getBody());
String bodyStr = deserializeString(body);
return new Values(msg.getTags(), msg.getKeys(), bodyStr);
}
@Override
public Fields getOutputFields() {
return DEFAULT_FIELDS;
}
}
最终将工程编译的jar包放在storm的lib目录下,并且在工程中引入该组件包。
<dependency>
<!-- 由storm最新主干的\external\storm-rocketmq工程改造而来 -->
<groupId>com.going.saas</groupId>
<artifactId>going-storm-rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.6.2.Final</version>
</dependency>
到这一步就可以直接使用RocketMqSpout了,首先创建一个Properties类,设置创建RocketMq消息端所需要的参数。
Properties properties = new Properties();
properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, "10.209.8.126:9876");
properties.setProperty(SpoutConfig.CONSUMER_GROUP, "RESOURCE_INFOS_CONSUMER_GRP");
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, "RESOURCES_INFO_TOPIC");
properties.setProperty(SpoutConfig.SCHEME,SpoutConfig.MESSAGE_SCHEME);
再根据参数创建RocketMqSpout对象即可。
new RocketMqSpout(properties)
Bolt开发
该例子中开发一个Bolt,处理从Spout中获取转为Tuple对象的消息信息。将信息存在Redis中。
import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.rocketmq.spout.scheme.DefaultMessageScheme;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.JedisCommands;
/**
* 获取终端发来的资源利用信息, 将新获取的资源信息替换旧的资源信息。
*
* @author Administrator
*
*/
public class ResourcesInfoSplitBolt extends AbstractRedisBolt {
private static final String REDIS_KEY_PREFIX = "RESOURCE_INFO_";
public ResourcesInfoSplitBolt(JedisPoolConfig config) {
super(config);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
protected void process(Tuple tuple) {
JedisCommands jedisCommands = null;
try {
String body = tuple.getStringByField(DefaultMessageScheme.FIELD_BODY);
String tags = tuple.getStringByField(DefaultMessageScheme.FIELD_TAGS).trim();
jedisCommands = getInstance();
jedisCommands.lpush(REDIS_KEY_PREFIX+tags, body);
} finally {
if (jedisCommands != null) {
returnInstance(jedisCommands);
}
this.collector.ack(tuple);
}
}
Topology开发
Spout和Bolt开发完成以后,就需要开发Topology将它们串连起来,具体示例代码如下:
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.rocketmq.SpoutConfig;
import org.apache.storm.rocketmq.spout.RocketMqSpout;
import org.apache.storm.topology.TopologyBuilder;
import com.going.storm.bolt.ResourcesInfoSplitBolt;
public class ResourceInfosTopology {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, "10.209.8.126:9876");
properties.setProperty(SpoutConfig.CONSUMER_GROUP, "RESOURCE_INFOS_CONSUMER_GRP");
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, "RESOURCES_INFO_TOPIC");
properties.setProperty(SpoutConfig.SCHEME,SpoutConfig.MESSAGE_SCHEME);
// RocketMqSpout spout = new RocketMqSpout(properties);
properties.setProperty("REDIS_HOST", "10.209.8.126");
properties.setProperty("REDIS_PORT", "6379");
// 定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("resouece-info-reader", new RocketMqSpout(properties));
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()//
.setHost(properties.getProperty("REDIS_HOST"))//
.setPort(Integer.valueOf(properties.getProperty("REDIS_PORT")))//
.build();
builder.setBolt("word-normalizer", new ResourcesInfoSplitBolt(poolConfig)).shuffleGrouping("resouece-info-reader");
// builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
// 配置
Config conf = new Config();
// conf.put("wordsFile", args[0]);
conf.setDebug(false);
// 运行拓扑
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Resource-Infos-Topologie", conf, builder.createTopology());
Thread.sleep(1000000000);
cluster.shutdown();
}
}
storm部署运行
工程开发完成以后就需要将工程进行部署运行。我这里以docker为例对开发的功能进行部署。
这里基于storm官方的docker镜像进行部署。
我们基于storm的1.2.2版本的镜像(与本地开发的storm版本保持 一致就好),将工程依赖的第三方包加入到镜像中storm组件的extlib目录下。
FROM storm:1.2.2
ADD extlib/dubbo-2.5.8.jar extlib/dubbo-2.5.8.jar
ADD extlib/fastjson-1.2.29.jar extlib/fastjson-1.2.29.jar
ADD extlib/going-storm-rocketmq-0.0.1-SNAPSHOT.jar extlib/going-storm-rocketmq-0.0.1-SNAPSHOT.jar
ADD extlib/guava-13.0.1.jar extlib/guava-13.0.1.jar
ADD extlib/jackson-annotations-2.9.0.jar extlib/jackson-annotations-2.9.0.jar
ADD extlib/jackson-core-2.9.4.jar extlib/jackson-core-2.9.4.jar
ADD extlib/jackson-databind-2.9.4.jar extlib/jackson-databind-2.9.4.jar
ADD extlib/javassist-3.20.0-GA.jar extlib/javassist-3.20.0-GA.jar
ADD extlib/jedis-2.9.0.jar extlib/jedis-2.9.0.jar
ADD extlib/netty-3.2.5.Final.jar extlib/netty-3.2.5.Final.jar
ADD extlib/netty-all-4.0.42.Final.jar extlib/netty-all-4.0.42.Final.jar
ADD extlib/rocketmq-client-3.6.2.Final.jar extlib/rocketmq-client-3.6.2.Final.jar
ADD extlib/rocketmq-client-4.2.0.jar extlib/rocketmq-client-4.2.0.jar
ADD extlib/rocketmq-common-4.2.0.jar extlib/rocketmq-common-4.2.0.jar
ADD extlib/rocketmq-remoting-4.2.0.jar extlib/rocketmq-remoting-4.2.0.jar
ADD extlib/storm-redis-1.2.2.jar extlib/storm-redis-1.2.2.jar
然后通过生成好的镜像ab/storm生成实例
docker run -d --name storm-resources -it -v ${LOCAL_PATH}/going-storm-0.0.1-SNAPSHOT.jar:/topology.jar ab/storm storm jar /topology.jar com.going.storm.topology.ResourceInfosTopology
到这一步日志统计功能即开发完成了,对应的分析功能也可以通过Bolt来完成。
网友评论