一、Storm简介
1.1 Storm是什么
Apache Storm(http://storm.apache.org)是由Twitter 开源的分布式实时计算系统,Storm 可以非常容易并且可靠的处理无线的数据流,对比Hadoop的批处理,Storm是一个实时的、分布式的、具备高容错的计算系统。
1.2 Storm官网
http://storm.apache.org/
1.3 Storm原理
分布式的实时计算系统,能够可信任的处理大量的流式数据,就好比Hadoop对于批量数据进行的处理一样;通常来说,Hadoop能够进行大批量数据的离线处理,但是在实时计算上的表现实在是不尽如人意;而Storm就可以担当这部分的作用。
1.4 Storm核心概念
Storm计算结构中的几个核心概念为 topology,stream,spout,bolt,下面我们将依次介绍。
(1)Topology
Topology是 storm 中最核心的概念,其是运行在 storm 集群上的一个实时计算应用,相当于 hadoop 中的一个 job,区别于 job 的时,job 会有明确的开始和结束,而 topology 由于实时的流式计算的特殊性,从启动的那一刻起会永远的运行下去,直到手动停止。
Topology由 stream,spouts,bolts 组成,可以描述为一个有向无环图,如下:图一 topology 示例
(2)Stream
Stream是 storm 中对数据流的抽象,是由无限制的 tuple 组成的序列。Tuple 可以理解为包含一个或多个键值对的 hash。Tuples 在 stream 中流经 bolts,被逐步处理,最终得到预设的结果。
Stream可比作一条源源不绝的河流,tuple 就是组成这条河流的无数水滴。每一个 stream 在 storm 中都有一个唯一标示的 id。
(3)Spout
从图一可以看出,spout是一个 topology 的数据源,负责连接数据源,并将数据转化为 tuple emit 到 topology中,经由 bolts 处理。
Spout提供了一对核心方法<ack, fail>来保障 storm 在数据没有被正确处理的情况下,不会被丢弃,仍能被重新处理,当然这是可选的,我们也可以不关心 tuple 是否被正确的处理,只负责向topology 中 emit 数据(在某些场景下可能不需要)。具体实现原理在后文会详细介绍。
Storm + Kakfa是很常见的组合,storm提供了storm-kafka扩展,封装了多个可用的 kafka spouts 供直接使用,相关文档可以参考这里。
(4)Bolt
Bolt是 topology 中的数据处理单元,每个 bolt 都会对 stream 中的 tuple 进行数据处理。复杂的数据处理逻辑一般拆分成多个简单的处理逻辑交由每个 Bolt 负责。
Bolt可以执行丰富的数据处理逻辑,如过滤,聚合,链接,数据库操作等等。
Bolt可以接受任意个数据流中的 tuples,并在对数据进行处理后选择性的输出到多个流中。也就是说,bolt 可以订阅任意数量的spouts 或其他 bolts emit 的数据流,这样最终形成了复杂的数据流处理网络,如图一。
理解了storm的核心概念后,下文将介绍storm的并发机制
1.5 Storm的基本架构
(1)Nimbus:如上图,就好比Hadoop中的JobTracker,是集群中的主节点,负责分发用户代码,把需要处理的任务指派给具体的Supervisor,再由其上的Worker进行实际的处理。
(2)Supervisor:集群中的从节点,负责管理机器上运行的Worker进程,这里,需要注意,worker是一个进程,其内部还可以启动多个线程来进行任务的处理;通常,我们再指定的时候,会在此处通过指定端口号,来指定机器上到底启动多少个worker。
(3)Zookeeper:基本只要牵涉到集群,都需要用到zookeeper,这也符合其作为动物园管理员的职责,通过zookeeper,nimbus会感知到Supervisor的下线和上线,会合理分配资源,完成Topology的处理
(4)Topology:这就好比我们平时提交的一个Application,只是换了一个名称而已。
1.7 Storm与Spark的区别
其实,这里更应该说是Spark-Streaming与storm的区别,因为spark目前也在朝着打造一个生态圈的目标而努力,拥有spark-sql,能够实现类似Hive的数据仓库管理;而Saprk-Streaming,则是用来进行实时处理,类似于Storm的功能;二者实现的功能相似,但实际上还是有些区别的。
(1)实时性来说,Storm的实时性更强,基本上就是来一条数据,就处理一条数据;在编写Spark代码的时候,会发现,其本身就是收集一段时间的数据来进行统一处理,虽然可以尽可能缩小这个时间,但如果数据瞬间涌入过多的话,其性能相比于Storm还是有些不足的。
(2)健壮性来说,Storm的实现中使用了zookeeper来实现,而且还有Ack机制,对于数据是否处理成功能够感知到而Spark则是采取了业界常用的WAL,即预写日志和CheckPoint机制,相比之下,健壮性要差一些
(3)并行度的适时调整:对于一个公司来说,业务肯定会存在高峰期和低谷期,所以storm能够动态调整实时计算程序的并行度,能够最大限度利用集群资源,这点也很棒;而Spark是实现不了的。
(4)但是,Spark最好的一点在于,其吞吐量比较大,而且Spark-Streaming位于Spark生态圈中,如果想要加入许多的附加功能,可以用Spark自己的组件就能够实现无缝对接,这一点是Storm无法相比的,因为Storm就是专门用于做实时处理的,其他功能的实现,肯定性能要差一些。
二、Storm核心
2.1 Storm的并发
上文提到storm是 scalable 的,是因为 storm 能将计算切分成多个独立的 tasks 在集群上并发执行,从而支持其在多台设备水平扩容。那 storm 的并发是如何实现的呢?回答这个问题之前先来看一下 topology 是如何运行在 storm 集群中的:
上图中包含三个核心概念:
(1)worker: 一个 worker 对应一个进程,是一个 topology 的子集,在 storm 集群中的一个node上可根据配置启动N个 worker。
(2)Executor:一个 executor 是运行在一个 worker 进程上的线程,executor 可以执行同一个 spout 或 bolt 的一个或多个 task ,默认的一个 executor 会分配一个 task。
(3)Task:task负责真正的数据处理逻辑,一个 task 实质上是一个spout 或者 bolt 的实例。
所以,一个物理设备上可以运行多个worker,一个 worker 内部又可以启动多个 executor ,每个 executor 可以执行一个或多个task。
Strom的并发度是用来描述所谓的 "parallelism hint",它是指一个 component(spout or bolt)的初始启动时的 executor 数量。通过下图我们来看一个 topology 的并发示例:
上图的topology有一个 spout 和两个 bolt 组成。其中 blue spout 包含两个 executor,每个 executor 各执行一个 blue spout 的 task;green bolt 包含了两个 executor,每个 executor 各执行两个task;yellow bolt 包含6个 executor,每个 executor 各执行一个task。
整个topology启动了两个 worker,共包含 12 个task,每个worker 包含5个 executor,也就是5个 Thread。所以其 parallelism hint 是10。
从上例可以看出,增加分配给topology的 worker 数和 executor
数是直接增加其计算能的简单办法。Storm提供了相关的 API 或通过配置文件来修改一个 topology 的 woker 数,同样的
storm提供了相关 API 控制 executor 的数量和每个 executor执行的 task 数量用以控制并发。
2.2 Stream grouping数据分组
除了spout和 bolt外,定义一个 topology 还有一个重要的组成,那就是 stream grouping,它规定了 topology 中的每一个 bolt 实例(也即是task)要接收什么样的 stream 作为输入。
具体来说,stream group定义了一个 stream 中的 tuple 最终被emit 到哪个 bolt task 上被处理,是一个数据分组机制。storm 提供了八种内置的 stream grouping 类型(storm 1.o.x版本的内置类型,):
(1)Shuffle grouping : 随机分组,随机的分发 tuple 到每个 bolt 的各个 task,每个 task 接收的 tuples 数量相同。
(2)Fields grouping : 按字段分组,会根据 tuple 的 某一个字段(可以理解为 tuple 这个 hash 的 key)分组,同一个字段的 tuple 永远被分配给同一个 task 处理。
(3)Partial Key grouping : 类似2,但实现了 stream 下游的两个
(4)bolts 间的负载均衡,在 tuple 的字段分布不均匀时提供了更好的资源利用效果。
(5)All grouping : 全复制分组,所有的 tuple 复制后,都会分发给所有的 bolt 的 task 进行处理。
(6)Global grouping : 全局分组,所有的 tuples 都 emit 到唯一的一个 task 上,如果为一个 bolt 设置了多个 task,会选择 task id 最小的 task 来接收数据,此时设置的并发是没有意义的。
(7)None grouping : 不分组,功能上同1,是预留接口。
(8)Direct grouping : 指定分组,数据源会调用 emitDerect 方法来判断一个 tuple 将发送到哪个 cosumer task 来接收这个 tuple。这种分组只能在?被声明为指向性的数据流上使用。
(9)Local or shuffle grouping : 本地随机分组,和1类似,但是在随机分组的过程中会,如果在同一个 woker 内包含 consumer task,则在 woker 内部的 consumer tasks 中进行随机分组,否则同1。
(10)另外,可以通过扩展CustomStreamGrouping实现自定义的分组方式。
2.3 Storm可靠性分类
在这之前,我们需要介绍一个概念"fully processed"。一条message 自从它由 spout emit 到 topology,被这个 tuple 途径的整个?DAG 中的所有 bolt 都处理过,storm 认为这个 message 是被 "fully processed"。Storm 的消息保障处理机制是针对 "fully processed" 而言的。
在系统级,storm提供了 "best effort","at least once","exactly once" 三种类型。其中 "best effort" 是不保证每条消息都被处理,"at least once" 是保障消息最少能被处理一次,可能会被多次处理,"exactly once" 是保证消息被处理且只被处理一次。
"best effort"这种类型没什么可说的,就是每条消息 storm 都会按程序逻辑走下去,但是不会关注其是否成功。"at least once",是storm-core 提供的可靠性级别,即保证每条 message 至少会被处理一次,可能会出现多次处理的情况,下文将详细介绍其实现原理。
至于"exactly once"其实是由 storm 的高级抽象 Trident 实现的,我们会在后文对其介绍。
2.4 Storm实现可靠性的API
现在,我们介绍一下storm保证可靠性的实现接口。在 storm 中要保障消息被处理你需要做以下两件事才能保证 spout 发出 tuple 被处理:
(1)无论在什么节点,每当你新创建一个 tuple 是都要告知 storm
(2)无论在什么节点,每当你处理完成一个 tuple 都需要告知 storm
对于spout,storm的提供了非常简单的API保证可靠性:
(1)nextTuple:这个接口负责emit tuple,为了保证可靠性需要为每个 tuple 生成一个唯一 ID,在通过 collector emit tuple 时,是需要带上这个 ID。同时会将这个 tuple 和 ID 保存在一个 hash 中,以等待 tuple 被完全处理后相应的操作.
(2)ack:这个接口负责处理成功的应答,一般当收到成功处理这个tuple 的消息后,删除 hash 中这个 tuple 的记录。
(3)fail: 这个接口复杂处理失败的应答,当某个 tuple 处理失败而超时后会调用这个接口,一般选择重新 emit 这条消息。
2.5 Storm高效实现可靠性的原理
在storm中有这样一个special "acker" tasks,它负责跟踪所有由spout 发出的 tuple?产生的 DAG。当一个 tuple 成功的在 DAG
中完成整个生命周期,这个task会通知 emit 这个 tuple 的 spout task 这个 tuple 被处理了。所以如果期望消息至少被处理一次,最少要启动一个 acker task,当然你可以启动任意个。
Storm会通过 "mod hashing" 的方法将一个 tuple 分配到合适的acker 去跟踪,因为每一个 tuple 都对应一个64位的唯一ID,并且在锚定 tuple 时这个ID也会随之传给新生成的 tuple,所以 DAG 中的每个节点根据这个 ID 可以判断应答消息发送给哪个 acker。同样 acker 也能从在应答消息中确认哪个 tuple 的状态被更新了,当一个 tuple 的整个 DAG 完成,acker 会发送确认消息给源 spout。
Acker不会明确的追踪整个 DAG,否则当 DAG 越发复杂时其负担越重。Acker 的追踪算法非常之简洁高效,并且只对于每个追踪的tuple 只会占用大约20B的固定空间。
Storm会在系统中维护一个表,这个表的 key 是 acker 追踪的每个 tuple 的 ID,value 的初始值也是这个 ID。当 DAG 中的下游节点处理了这个 tuple 后,acker 接到确认信息后会做一个 XOR 运算,用 XOR 的运算结果来更新这个 ID 在表中对应的 val。
在这里需要说明一下在DAG中每个新生成 tuple 都会有一个64位的随机值ID(注意:不是其锚定的tuple传来的spout emit的那个tuple 的ID。也就是说每个新生成的 tuple 会有一个唯一 ID,新生成的 tuple 锚定某一个 tuple 后也会知晓 spout tuple 的那个 ID),在每个计算节点,storm 会将这个计算节点生成的所有 tuple 的 ID 与所有输入 tuple 的 ID 以及这个 DAG 所追踪的 tuple 在系统表中对应的 value 做 XOR 操作,得到一个结果,并用这个结果更新系统表中对应的 value。
2.6 Storm在各种失败场景下的保障方法
情景1:DAG 中某个节点挂掉没有正常发送 fail msg。这时其对应的根节点的 tuple 最后会因超时而被 spout 重发。
情景2:跟踪 tuple 的 acker task 挂了。此时,这个acker跟踪的所有task都会因为超时而重发(因为 acker 不会更新其在系统中对应的value)。
情景3:spout 挂了。因为spout的输入往往来自队列,当 spout 挂掉后,这个 spout 没有对队列中的消息做确认回应,所以队列不会认为这个 spout 提走的数据被正常消费了,而作"出队"处理(其实是将执行中并没有确认的数据重新归队)。
网友评论