在jstorm的架构中,Nimbus的作用是用来操作Topology、上传/下载文件、获取集群状态和拓扑状态等。主要作用是接收client的请求,并在计算之后,修改zk上的数据。
1.启动
使用下面的命令就可以启动nimbus服务器端了
jstorm nimbus
jstorm是一个python脚本,会组装java命令后启动NimbusServer
java -server
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1
-Dstorm.options=
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
-Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70
-Dlogfile.name=nimbus.log
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties
-cp /Users/shishengjie/software/jstorm-0.9.1/jstorm-client-extension-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/jstorm-client-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/jstorm-server-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/lib/kryo-2.17.jar:/
com.alibaba.jstorm.daemon.nimbus.NimbusServer
在使用idea调试的时候,我们需要把相关的参数设置进去。
2. NimbusServer
2.1 启动过程
NimbusServer的启动过程主要是在launchServer
方法中,
- 读取配置,首先从
defaults.yaml
中读取默认配置;然后从storm.conf.file
指定的文件(如果没有,从classpath查找storm.yaml);最后从命令行读取storm.options
中的配置,并依次覆盖。 - 检查conf配置是否为集群模式,
storm.cluster.mode
默认是distributed - 初始化关闭hook,会在jvm关闭时执行cleanup方法
- 创建NimbusData,主要是初始化两个用于上传和下载的map,这个map带有过期清除功能,过期后可以回调指定方法。
- 使用Curator客户端创建
leaderSelector
,用于nimbus客户端选择leader,选举成为leader后会调用listener中的方法。 - 如果未获得leader角色,会初始化follower的线程,该线程的主要工作是清理已经分配完成的本地文件,从master下载文件。
2.2 leader
成为leader后,nimbus服务器需要很多初始化操作:
- 获取hostname和port(配置:nimbus.thrift.port默认为7627),将master的host和port写入到zk下面的/jstorm/master节点的内容里;
- cleanupCorruptTopologies清除中断的拓扑,在zk中有记录但是本地dir上面没有的拓扑被认为是被中断的。在zk中找 /jstorm/topology/下面的节点,本地路径
/local-storm-dir/nimbus/stormdist
下面是拓扑列表,将zk中的列表移除本地有的,剩下都是本地没有的,清除的操作就是从zk中移除节点。 - initGroup初始化group: 如果配置了分组模式(nimbus.groupfile.path中指定了文件路径,如conf下的group.ini)那么会解析这个文件,并将group信息保存到NimbusData中。
- initTopologyAssign初始化TopologyAssign,这是一个单例,TopologyAssign线程,用于为topology的task分配资源
- initTopologyStatus,初始化状态,更新 /jstorm/topology/的子节点的StormBase的状态为StatusType.startup
- initMonitor初始化监视器,nimbus.monitor.freq.secs 10s运行一次,根据心跳时间查找dead的task,修改zk中topology的状态为monitor
- initCleaner初始化清理器,每隔一段时间(默认600秒),清理/LOCAL-DIR/nimbus/inbox中上传的jar包
- 创建ServiceHandler,其实现了thrift的nimbus服务,处理client的请求
- initThrift初始化thrift服务器,用来处理client端的请求
3. 提供的服务
3.1 getClusterInfo
这个服务用于从zk中获取集群当前状态:
- 从zk获取
/jstorm/topology/
下获取子节点和里面的内容StormBase,遍历topology,获取每个拓扑的Assignment,获取group信息,封装到TopologySummary - 获取Supervisor的状态,从
/jstorm/supervisors
下的所有子节点,内容是SupervisorInfo,将supervisors和assignments封装为SupervisorSummary - 封装为ClusterSummary返回
3.2 上传文件
beginFileUpload
开始上传文件主要确定了文件的上传后的路径和文件名:{localdir}/nimbus/inbox/stormjar-{uuid}.jar
,然后创建操作文件的Channel,加入到Uploader中。
uploadhunk
传入文件的全路径后和写入的byte数组,从Uploader获取Channel,将数据写入文件
finishFileUpload
结束上传,从Uploader获取Channel,关闭。
3.3 下载文件
beginFileDownload
创建BufferFileInputStream,然后生成一个uuid作为id返回给client端。将id和BufferFileInputStream实例保存到Downloaders中。
downloadChunk
client端传入了id,从Downloaders中获取BufferFileInputStream,然后读取一个chunk放入byte数组并返回。
3.4 killTopologyWithOpts
首先,确定topology是否处于激活状态,再修改/jstorm/topology/{topologyId}的内容是StormBase的状态为StatusType.kill
3.5 activate与deactivate
修改/jstorm/topology/{topologyId}的内容是StormBase的状态为StatusType.activate和StatusType.inactivate
3.6 rebalance
首先,确定topology是否处于激活状态,再修改/jstorm/topology/{topologyId}的内容是StormBase的状态为StatusType.rebalance
可以看到,nimbus的主要作用是获取配置,修改zk的配置,具体的实现还需要Supervisor进行相应的操作。下一节,我们分析Supervisor的启动和工作方式。
网友评论