前言
上一篇文章我们通过 Storm 的本地模式对其编程模型进行了讲述....
本篇文章我们来讲一讲 Storm 的集群:
- Storm 的特点
- Storm 的架构和组件
- 如何安装 Storm 集群
- 如何提交作业到 Storm 集群
Storm 的特点
-
分布式:这个没什么好说的,对于大数据来说,单台机器肯定是无法满足需求的
-
高容错,高可靠:其容错下了很大的功夫,这个我们以后再细说。
-
高性能:性能一直为大家所关注的一个点,storm其性能主要体现在以下几点:
- 流式处理而非微批处理,实时性更好(也是可以做微批的,不过一般不会这么做)
- 纯内存,不走磁盘
- Topology 的设计也是其性能的一个保障
-
高扩展:水平扩展能力好,可以动态调整资源,而不需重启任务
-
自带 UI,可维护性好
-
DRPC可以作为实时响应服务
-
使用广泛,社区活跃
Storm 架构
Storm 架构图.png从上图我们大概可以看到 Storm 的基本架构包括:(以下文本来源于此处)
-
Nimbus:
- 主节点,本身无状态
- 接收客户端任务Topology的提交,并负责在集群中分发代码,即Jar包
- 分配工作给从节点supervisor,注意不是直接分配,而是将任务发布到zookeeper上,由supervisor到zookeeper上领取任务(在Zookeeper相应的znode节点上写入任务分配信息,由supervisor查看这些znode上的任务分配信息,获取分配到的任务)
- 监控: 监听集群状态(从Zookeeper集群中相应znode上读取supervisor,worker进程的状态信息数据)
- 容错:当supervisor节点挂掉,由Nimbus将该节点上正在运行着的任务重新分配给其他supervisor执行
-
Supervisor
- 从节点
- 从Zookeeper上获取Nimbus分配的任务,负责启动和停止本机上worker进程来执行任务,worker的容错由Supervisor进程负责
- 需要定时将自己的运行状态信息(心跳信息)汇报到zookeeper上,由Nimbus监控(在Zookeeper相应的znode节点上写入心跳信息)
-
Worker
- 真正的执行任务的进程,负责启动executor线程来执行任务
- worker进程并不是常驻进程,不能通过手动启动
- 负责与其他worker之间进行数据传输
需要将自己的运行状态汇报到zookeeper上,由Nimbus监控
-
Executor
- 真正执行任务的线程(负责执行客户提交到Storm集群上任务中Task(spout/bolt)),由worker启动和停止
-
Zookeeper
- 存储任务调度信息、各节点状态信息、心跳
使Storm集群各节点保持无状态,这样具有高可靠性 -
这里也是找到一张Storm在zookeeper上的目录树图,可以看一下
zookeeper上的目录树图.png
- 存储任务调度信息、各节点状态信息、心跳
- UI (启动后进程名为 core)
- storm web监控页面
- 在storm.yaml配置文件中通过配置ui.port参数指定web ui访问端口
- UI需要与Nimbus运行在同一台服务器上(这一点没确认过,一般是这样配置的)
从架构图我们可以知道,Storm 的 主节点 Nimbus 和 从节点Supervisor 不会直接进行通讯,而是依赖于 zookeeper 这个组件,这也是Storm的一个设计巧妙之处,很好的避免了 主从之间依赖而产生的系列问题
安装 Storm 集群
安装什么的比较简单,这里就不再造轮子了,
可以看下这篇
这里有张部署完之后,各个节点本地的目录树图
提交作业到 Storm 集群
- 这里我们将上一篇讲到的代码,稍微改一改就可以了,当传入的参数大于1,我们将第一个参数作为 该作业的 名称 提交到集群去运行,否则就是本地测试模式
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("textSpout", new MySpout(), 3);
builder.setBolt("MyBolt", new MyBolt(), 3).shuffleGrouping("textSpout");
Config config = new Config();
StormTopology topology = builder.createTopology();
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], config, topology);
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("test", config, topology);
}
}
-
打成 jar 包
这个就不要我说了吧... -
命令行提交
到 Storm 集群的任意一台机器,使用 Storm 指令进行提交:
storm jar xxx.jar com.package,name topologyName
如果一切没有问题,就可以正常执行了
网友评论