美文网首页
JStorm源码分析-4.Nimbus服务器

JStorm源码分析-4.Nimbus服务器

作者: 史圣杰 | 来源:发表于2019-03-15 11:31 被阅读0次

在jstorm的架构中,Nimbus的作用是用来操作Topology、上传/下载文件、获取集群状态和拓扑状态等。主要作用是接收client的请求,并在计算之后,修改zk上的数据。

1.启动

使用下面的命令就可以启动nimbus服务器端了

jstorm nimbus

jstorm是一个python脚本,会组装java命令后启动NimbusServer

java -server 
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1 
-Dstorm.options= 
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib  
-Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 
-Dlogfile.name=nimbus.log 
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties  
-cp /Users/shishengjie/software/jstorm-0.9.1/jstorm-client-extension-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/jstorm-client-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/jstorm-server-0.9.1.jar:/Users/shishengjie/software/jstorm-0.9.1/lib/kryo-2.17.jar:/
com.alibaba.jstorm.daemon.nimbus.NimbusServer 

在使用idea调试的时候,我们需要把相关的参数设置进去。

2. NimbusServer

2.1 启动过程

NimbusServer的启动过程主要是在launchServer方法中,

  1. 读取配置,首先从defaults.yaml中读取默认配置;然后从storm.conf.file指定的文件(如果没有,从classpath查找storm.yaml);最后从命令行读取storm.options中的配置,并依次覆盖。
  2. 检查conf配置是否为集群模式,storm.cluster.mode默认是distributed
  3. 初始化关闭hook,会在jvm关闭时执行cleanup方法
  4. 创建NimbusData,主要是初始化两个用于上传和下载的map,这个map带有过期清除功能,过期后可以回调指定方法。
  5. 使用Curator客户端创建leaderSelector,用于nimbus客户端选择leader,选举成为leader后会调用listener中的方法。
  6. 如果未获得leader角色,会初始化follower的线程,该线程的主要工作是清理已经分配完成的本地文件,从master下载文件。

2.2 leader

成为leader后,nimbus服务器需要很多初始化操作:

  1. 获取hostname和port(配置:nimbus.thrift.port默认为7627),将master的host和port写入到zk下面的/jstorm/master节点的内容里;
  2. cleanupCorruptTopologies清除中断的拓扑,在zk中有记录但是本地dir上面没有的拓扑被认为是被中断的。在zk中找 /jstorm/topology/下面的节点,本地路径/local-storm-dir/nimbus/stormdist下面是拓扑列表,将zk中的列表移除本地有的,剩下都是本地没有的,清除的操作就是从zk中移除节点。
  3. initGroup初始化group: 如果配置了分组模式(nimbus.groupfile.path中指定了文件路径,如conf下的group.ini)那么会解析这个文件,并将group信息保存到NimbusData中。
  4. initTopologyAssign初始化TopologyAssign,这是一个单例,TopologyAssign线程,用于为topology的task分配资源
  5. initTopologyStatus,初始化状态,更新 /jstorm/topology/的子节点的StormBase的状态为StatusType.startup
  6. initMonitor初始化监视器,nimbus.monitor.freq.secs 10s运行一次,根据心跳时间查找dead的task,修改zk中topology的状态为monitor
  7. initCleaner初始化清理器,每隔一段时间(默认600秒),清理/LOCAL-DIR/nimbus/inbox中上传的jar包
  8. 创建ServiceHandler,其实现了thrift的nimbus服务,处理client的请求
  9. initThrift初始化thrift服务器,用来处理client端的请求

3. 提供的服务

3.1 getClusterInfo

这个服务用于从zk中获取集群当前状态:

  1. 从zk获取 /jstorm/topology/下获取子节点和里面的内容StormBase,遍历topology,获取每个拓扑的Assignment,获取group信息,封装到TopologySummary
  2. 获取Supervisor的状态,从/jstorm/supervisors下的所有子节点,内容是SupervisorInfo,将supervisors和assignments封装为SupervisorSummary
  3. 封装为ClusterSummary返回

3.2 上传文件

beginFileUpload
开始上传文件主要确定了文件的上传后的路径和文件名:{localdir}/nimbus/inbox/stormjar-{uuid}.jar,然后创建操作文件的Channel,加入到Uploader中。

uploadhunk
传入文件的全路径后和写入的byte数组,从Uploader获取Channel,将数据写入文件

finishFileUpload
结束上传,从Uploader获取Channel,关闭。

3.3 下载文件

beginFileDownload
创建BufferFileInputStream,然后生成一个uuid作为id返回给client端。将id和BufferFileInputStream实例保存到Downloaders中。
downloadChunk
client端传入了id,从Downloaders中获取BufferFileInputStream,然后读取一个chunk放入byte数组并返回。

3.4 killTopologyWithOpts

首先,确定topology是否处于激活状态,再修改/jstorm/topology/{topologyId}的内容是StormBase的状态为StatusType.kill

3.5 activate与deactivate

修改/jstorm/topology/{topologyId}的内容是StormBase的状态为StatusType.activate和StatusType.inactivate

3.6 rebalance

首先,确定topology是否处于激活状态,再修改/jstorm/topology/{topologyId}的内容是StormBase的状态为StatusType.rebalance

可以看到,nimbus的主要作用是获取配置,修改zk的配置,具体的实现还需要Supervisor进行相应的操作。下一节,我们分析Supervisor的启动和工作方式。

相关文章

网友评论

      本文标题:JStorm源码分析-4.Nimbus服务器

      本文链接:https://www.haomeiwen.com/subject/rkdhmqtx.html