Storm版本:我们使用0.10.2的版本。Storm团队在2016年4月份发布了历史性的版本升级,终于到了1.0.0的版本,不过我们仍然使用0.10.2的版本演示,这个版本是目前生产环境中使用的较多的版本。
Storm架构:
下图是Storm的官方逻辑架构图:
从架构图中可以看出,Storm由2类节点组成:一个master节点和多个slave节点。
master节点:
在master节点上运行一个叫做“Nimbus”的守护进程。Nimbus 进程负责代码jar等资源的分发,分配任务给slave节点,以及故障监测。
slave节点:
每个slave节点上运行一个叫做”Supervisor”的守护进程。Supervisor进程监听分配给它的机器,并根据Nimbus 的委派在必要时启动和关闭工作进程。每个工作进程执行topology的一个子集。一个运行中的topology由很多运行在很多机器上的工作进程组成。
那么master节点和slave节点上运行的进程是如何协调工作了,答案就是:ZooKeeper。
从Storm的架构图中我们也可以看到:
Nimbus和Supervisors之间所有的协调工作是通过Zookeeper集群完成的。 Nimbus的守护进程和Supervisors守护进程是无法互相直接连接(没有RPC通信)和无状态的;所有的状态维持在Zookeeper中或保存在本地磁盘上。这意味着你可以 kill -9 Nimbus 或Supervisors 进程,所以他们不需要做备份。这种设计使得Storm集群具有令人难以置信的稳定性。
Storm相关概念:
1.Topologies:
一个完整的实时应用程序的逻辑是封装在Storm拓扑中。Storm拓扑类似于MapReduce作业。一个关键的区别是MapReduce作业最终会完成,而拓扑永远运行(或者直到你杀死它为止)。拓扑结构是与流分组连接的喷嘴和螺栓的图表
2.Streams:
流是Storm的核心抽象。
流是以分布式方式并行处理和创建的元组的无界序列。
流被定义为用于命名流的元组中的字段的模式。
默认情况下,元组可以包含整数,长整型,短片,字节,字符串,双精度,浮点数,布尔值和字节数组。
您还可以定义自己的序列化程序,以便定制类型可以在元组内部本地使用。
声明时,每个流都会被赋予一个id。由于单流Spouts和Bolts是如此常见,因此OutputFieldsDeclarer具有方便的方法来声明单个流而不指定id。在这种情况下,流的默认ID为“default”。
3.Spouts:
Spouts是拓扑中的流的源。通常Spouts将从外部源读取元组,并将其发布到拓扑中(例如Kafka)。Spouts可以是可靠或不可靠的。一个可靠的Spout能够重播一个元组,如果它无法被Storm处理,而一个不可靠的Spout一旦发出就忘记了元组。
Spouts可以发射多个流。为了这样做,使用声明多个流declareStream的方法OutputFieldsDeclarer并指定流以发射,使用SpoutOutputCollector的emit方法。
Spouts的主要方法是nextTuple,用于生成Tuple。
Spouts的其他主要方法是ack和fail。当Storm检测到从Spouts发出的Tuple已经通过拓扑成功完成或未能完成时,将调用这些命令。只有可靠的数据源才需要调用ack或者fail
4.Bolts
拓扑中的所有处理都是通过Bolts进行的。Bolts可以完成很多操作,例如:filtering, functions, aggregations, joins, talking to databases等。
Bolts可以进行简单的流转换。复杂的流转换通常需要多个步骤,因此需要多个Bolts。
Bolts可以发射多个流。为了生成多个流,使用声明多个流OutputFieldsDeclarer的declareStream的方法。
当声明一个Bolt的输入流时,您始终订阅另一个组件的特定流。如果要订阅另一个组件的所有流,则必须单独订阅每个流。
Bolts的主要方法是execute:处理新元组作为输入。Bolts使用OutputCollector对象发出新的元组。处理完成后,Bolts必须调用OutputCollector的ack方法,确认已经处理完成原始Tuple。
请注意,OutputCollector不是线程安全的。
5.Stream groupings
定义拓扑的其中一部分作用是为每个应该接收的Bolts指定输入。流分组定义了如何在Bolts的任务之间分配该流。
Storm有八个内置流分组,你可以通过实现自定义流分组CustomStreamGrouping接口来定制自己的流分组:
- Shuffle grouping:Tuple被随机分布在Bolts的任务中,使得每个Bolt被保证获得相等数量的元组。
- Fields grouping:流由分组中指定的字段分区。例如,如果流被“user-id”字段分组,则具有相同“user-id”的元组将始终进入相同的任务,但是具有不同“user-id”的元组可能会转到不同的任务。
- Partial Key grouping:流由分组中指定的字段进行分区,如“字段”分组,但是在两个下游Bolts之间进行负载平衡,当传入数据歪斜时,可以更好地利用资源。
- All grouping:流在所有的Bolts任务中复制。
- Global grouping:整个流转到Bolts任务中的单个任务。具体来说,它到达最低ID的任务。
- None grouping:此分组指定您不关心流如何分组。目前,无任何分组相当于混洗分组。
- Direct grouping:这是一种特殊的分组。以这种方式分组的流意味着Tuple的生成者决定消费者的哪个任务将接收该Tuple。直接分组只能在已声明为直接流的流上声明。
- Local or shuffle grouping:如果目标Bolts在同一个工作进程中具有一个或多个任务,则元组将被混洗分组,仅适用于进程内任务。否则,这样做就像一个正常的混洗分组。
6.Reliability
Storm保证每个Spout元组将被拓扑完全处理。它通过跟踪由每个出口Tuple触发的元组树,并确定该元组的树何时已经成功完成。每个拓扑都有一个与之相关联的“消息超时”。如果Storm无法检测到在该超时内已经完成了一个Spout元组,那么它将在元组失败后稍后重放。
为了利用Storm的可靠性功能,元组树在被创建和完成处理时都要要通知Storm。
7.Tasks
每个Spout或Bolt在集群中执行尽可能多的任务。每个任务对应一个执行线程,是Spouts或Bolts的组件实例。
8.Workers
Topology运行在一个或多个worker进程上,每个worker都是一个JVM虚拟机,运行着Topology所有task的一个子集。比如,Topology的并发度是300,设置50个worker,那么每个worker就会分配6个task。Storm会尽量平衡所有worker的task数量。
Storm安装:
- 下载地址:http://storm.apache.org/downloads.html 选择对应的版本下载,我下载是0.10.2版本。
- 安装
拷贝下载包到安装目录下,我的安装目录是:/Users/wesley/apps
tar zxvf apache-storm-0.10.2.tar.gz
进入conf目录:
cd conf
修改storm.yaml
vim storm.yaml
修改为如下的配置:
storm-2.pngstorm.yaml的配置要注意有个坑:每个配置名称和值中间的冒号:前后都要有空格,不然会报:could not found expected ':' 的错误
后台进程方式启动Nimbus进程:
nohup bin/storm nimbus >> nimbus.log &
后台进程方式启动Supervisor进程:
nohup bin/storm supervisor >> supervisor.log &
后台进程方式启动Supervisor进程:
nohup bin/storm ui >> ui.log &
运行jps命令查看相关进程是否启动:
jps -l
storm-3.png
进入Storm UI页面查看是否安装正常:http://localhost:9090/index.html
storm-4.png我们将在后面的文章演示如何使用Storm编程。
网友评论