开发好一个topology之后,需要提交到nimbus服务节点,并由nimbus进行分发处理。这一个过程有以下两种方式:
1、storm客户端
2、storm api
但本质上都会进入到storm api进行提交。
先说storm 客户端部分:
提交topology的storm客户端命令即:storm jar xxx.jar com.xx.yy.TheTopology topologyName,这个命令来自$storm_home/bin/下的提供,python实现的,如下所示:
storm.py jar定义:
在这儿会调用exec_storm_class并组装出java命令,并调用。jarfile即是jar在客户端本地文件。
这里相当于执行了jar 里面Topology的Main方法,而Main方法里面一般会调用storm的api,所以,客户端只是提供了一个提交途径而已,最终会回到api进行处理。storm里面还有一些别的功能函数如下
杀掉拓扑
激活拓扑
暂停拓扑
重新分配拓扑
等等。
下面进入api部分:
StormSubmitter里面有很多重载的提交topology的方法,但最终会调用submitToplogyAs方法。
执行过程如下(这里就直接图文描述了,不去整理时序图什么的):
1、storm配置校验,递归校验里面的对象是否能够进行json序列化
2、命令行及yaml配置组装及验证
a、readCommandLineOpts读取命令行配置
b、readStormConfig读取defaults.yaml及storm.yaml配置
c、validateConfs会做些验证,比如验证对小堆内存配置等
3、topology验证是否重复。通过NimbusClient获取Cluster Summary及Topology Summary,遍历验证是否存在相同topology名称
4、上传。由submitTopologyInDistributeMode方法执行
这里面会调用submitJarAs方法进行jar的上传,如下:
其中storm.jar即是之前获取的具体jar路径,看下里面做了什么:
a、通过Nimbus.Client获取Nimbus服务节点上的本地目录uploadLocation,也就是jar会上传到Nimbus服务节点的本地目录inbox/下面
b、通过输入流获取文件并分chunk上传,上传时值得注意的是ProgressListener,这里相当于提供了一种监听处理机制,可以在以后的类似场景种借鉴使用。具体提交是通过thrift rpc进行提交的
c、这个时候jar已经传到了Nimbus服务节点上,可以进行top分发了
同样会调用到sendBase,即通过thrift rpc进行通信
以上就是提交拓扑及分发的一个过程。
最后说一下,因为采用thrift rpc进行跨进程通信,所以会要求topology相关的组件及数据对象实现序列化。
网友评论