开发好一个topology之后,需要提交到nimbus服务节点,并由nimbus进行分发处理。这一个过程有以下两种方式:
1、storm客户端
2、storm api
但本质上都会进入到storm api进行提交。
先说storm 客户端部分:
提交topology的storm客户端命令即:storm jar xxx.jar com.xx.yy.TheTopology topologyName,这个命令来自$storm_home/bin/下的提供,python实现的,如下所示:

storm.py jar定义:

在这儿会调用exec_storm_class并组装出java命令,并调用。jarfile即是jar在客户端本地文件。
这里相当于执行了jar 里面Topology的Main方法,而Main方法里面一般会调用storm的api,所以,客户端只是提供了一个提交途径而已,最终会回到api进行处理。storm里面还有一些别的功能函数如下
杀掉拓扑

激活拓扑

暂停拓扑

重新分配拓扑

等等。
下面进入api部分:
StormSubmitter里面有很多重载的提交topology的方法,但最终会调用submitToplogyAs方法。
执行过程如下(这里就直接图文描述了,不去整理时序图什么的):
1、storm配置校验,递归校验里面的对象是否能够进行json序列化


2、命令行及yaml配置组装及验证

a、readCommandLineOpts读取命令行配置
b、readStormConfig读取defaults.yaml及storm.yaml配置
c、validateConfs会做些验证,比如验证对小堆内存配置等
3、topology验证是否重复。通过NimbusClient获取Cluster Summary及Topology Summary,遍历验证是否存在相同topology名称

4、上传。由submitTopologyInDistributeMode方法执行

这里面会调用submitJarAs方法进行jar的上传,如下:

其中storm.jar即是之前获取的具体jar路径,看下里面做了什么:

a、通过Nimbus.Client获取Nimbus服务节点上的本地目录uploadLocation,也就是jar会上传到Nimbus服务节点的本地目录inbox/下面
b、通过输入流获取文件并分chunk上传,上传时值得注意的是ProgressListener,这里相当于提供了一种监听处理机制,可以在以后的类似场景种借鉴使用。具体提交是通过thrift rpc进行提交的


c、这个时候jar已经传到了Nimbus服务节点上,可以进行top分发了

同样会调用到sendBase,即通过thrift rpc进行通信

以上就是提交拓扑及分发的一个过程。
最后说一下,因为采用thrift rpc进行跨进程通信,所以会要求topology相关的组件及数据对象实现序列化。
网友评论