美文网首页
JStorm源码分析-3.提交Topology

JStorm源码分析-3.提交Topology

作者: 史圣杰 | 来源:发表于2019-03-14 21:07 被阅读0次

    构建Topology并在本地测试后,我们就可以将工程打包为jar包,并通过jstorm的jar命令提交到集群。这个过程使用了thrift的远程调用,相关技术可以参照http://matt33.com/2016/04/07/thrift-learn/#Thrift%E4%B9%8BHello-World

    1.提交命令

    将工程打为jar包之后,可以通过下面的命令提交到jstorm集群中。在提交之前,我们需要先启动zookeeper,nimbus和supervisor。

    jstorm jar target/sequence-split-merge-1.0.5-jar-with-dependencies.jar 
    com.alipay.dw.jstorm.example.sequence.SequenceTopology sequence_test
    

    2.程序分析

    2.1 jstorm脚本

    jstorm命令其实是一个python脚本,位于jstorm-server项目bin/jstorm.py中。这个脚本的主要工作是提供了一些子命令,最终会调用指定的jar包内的main方法。
    在main方法中,会根据传入的参数查找对应的方法。例如,jstrom jar命令就是jar对应的jar方法。在这个脚本中包含了很多子命令,可以看到有如下多个子命令,用来控制jstorm集群的行为。

    COMMANDS = {"jar": jar, "kill": kill, "nimbus": nimbus, "zktool": zktool,
                "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
                "remoteconfvalue": print_remoteconfvalue, "classpath": print_classpath,
                "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage}
    

    jar方法的主要逻辑是拼装java命令并调用

    def jar(jarfile, klass, *args):
        childopts = "-Dstorm.jar=" + jarfile + (" -Dstorm.root.logger=INFO,stdout -Dlog4j.configuration=File:%s/conf/aloha_log4j.properties"  %JSTORM_DIR)
        exec_storm_class(
            klass,
            jvmtype="-client -Xms256m -Xmx256m",
            extrajars=[jarfile, CONF_DIR, JSTORM_DIR + "/bin", LOG4J_CONF],
            args=args,
            childopts=childopts)
    

    例如,本文jstorm jar的命令最终会执行如下java命令,我们需要到我们编写main方法中的StormSubmitter查看具体逻辑。

    java -client -Xms256m -Xmx256m 
    -Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1 
    -Dstorm.options= 
    -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib 
    -Dstorm.jar=target/sequence-split-merge-1.0.5-jar-with-dependencies.jar 
    -Dstorm.root.logger=INFO,stdout 
    -Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/aloha_log4j.properties 
    -cp /.../....jar:target/sequence-split-merge-1.0.5-jar-with-dependencies.jar:/Users/shishengjie/.jstorm:/Users/shishengjie/software/jstorm-0.9.1/bin:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties 
    com.alipay.dw.jstorm.example.sequence.SequenceTopology 
    "sequence_test"
    

    2.2 StormSubmitter

    StormSubmitter类定义在jstorm-client工程中,submitTopology方法用来向运行中的jstorm集群提交Topology。

    public static void submitTopology(String name, Map stormConf,
                StormTopology topology, SubmitOptions opts)
                throws AlreadyAliveException, InvalidTopologyException,
                TopologyAssignException {}
    

    提交时我们可以看到,需要指定拓扑名称和配置,StormTopology与SubmitOptions类的实例。前面几个参数我们都知道是如何设置的,SubmitOptions是thrift定义的一个对象,用来表示拓扑是否激活。

    struct SubmitOptions {
      1: required TopologyInitialStatus initial_status;
    }
    enum TopologyInitialStatus {
        ACTIVE = 1,
        INACTIVE = 2
    }
    
    // java中使用
    SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
    

    提交拓扑的主要逻辑:

    1. 参数的处理,开发者设置的stormConf并不完整,因此需要进行处理。首先会校验stormConf格式是否正确,stormConf需要是一个Json序列化格式,然后从命令行读取storm.options属性的内容,覆盖stormConf;接下来分别读取defaults.yaml和从storm.conf.file指定的storm配置文件中读取配置(如果没有指定,则从classpath中查找storm.yaml文件,jstorm-server中有一个名为storm.yaml的文件,里面是默认配置)。读取完文件的配置后,再次读取命令行指定的配置,保存为conf,并将stormConf的内容覆盖conf,这样就完成了补充配置的工作,stormConf是用户指定配置的,conf是补充后的完整配置。最后,设置用户分组参数,将stormConf序列化为json字符串。
      2.使用NimbusClient的静态方法从conf中获取NimbusClient实例
      3.调用NimbusClient的getClusterInfo方法向服务器获取集群信息,校验指定的拓扑名称是否已经存在。
      4.使用NimbusClient将storm.jar指定的jar包上传到服务器,storm.jar是在jstorm.py中设置的。
      5.使用NimbusClient的submitTopology方法提交拓扑。

    至此,jstorm完成了jar包和拓扑的提交。我们可以看到全部是通过NimbusClient来完成的。有client端就有server端,这就涉及到了jsorm的架构。storm命令的python脚本都是作为client端发送请求的,而使用jstorm nimbus启动的进程则是服务端,用来处理client请求。

    2.3 Nimbus

    jstorm的nimbus命令会启动一个服务器进程,其提供了很多thrift服务,用来管理集群中拓扑,任务,worker等状态。Nimbus也定义在strom.thrift中,声明了其提供的很多服务。NimbusClient是jstorm封装的客户端,用来向服务器发出请求。
    下面是使用thrift定义的nimbus提供的服务。

    service Nimbus {
    // 拓扑相关
      void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: TopologyAssignException tae);
      void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3:TopologyAssignException tae);
      void killTopology(1: string name) throws (1: NotAliveException e);
      void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
      void activate(1: string name) throws (1: NotAliveException e);
      void deactivate(1: string name) throws (1: NotAliveException e);
      void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);
    
      // 上传文件
      string beginFileUpload();
      void uploadChunk(1: string location, 2: binary chunk);
      void finishFileUpload(1: string location);
      
    // 下载文件
      string beginFileDownload(1: string file);
      //can stop downloading chunks when receive 0-length byte array back
      binary downloadChunk(1: string id);
    
      // 获取集群状态和拓扑状态
      string getNimbusConf();
      ClusterSummary getClusterInfo();
      TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
      SupervisorWorkers getSupervisorWorkers(1: string host) throws (1: NotAliveException e);
      string getTopologyConf(1: string id) throws (1: NotAliveException e);
      StormTopology getTopology(1: string id) throws (1: NotAliveException e);
      StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
    }
    

    NimbusClient的获取

    NimbusClient继承了ThriftClient类,创建时传入了conf作为配置。主要的初始化方法在ThriftClient的构造方法中。

    1. 获取master地址:初始化时,首先会根据conf获取zookeeper的配置,然后创建Curator客户端,由于nimbus在启动后创建/jstorm/master节点并将master的host和ip写入节点,nimbus的client启动时会对该节点进行检查,之后会监听这个节点,发送变化时会flushClient。
    2. 创建客户端:主要的逻辑在flushClient中,会从节点中读取host和port,然后使用thrift的接口创建连接。
    3. 创建完成后,NimbusClient通过flush方法获取了与服务端通信的Nimbus.Client

    2.4 NimbusServer

    由于客户端所做的事情只是调用,具体的逻辑是在服务端实现的,因此有必要对服务端进行分析。我们在使用jstorm nimbus命令时,最终会调用com.alibaba.jstorm.daemon.nimbus.NimbusServer,里面有main方法,可以启动一个服务器进程。

    public static void main(String[] args) throws Exception {
        // read configuration files
        @SuppressWarnings("rawtypes")
        Map config = Utils.readStormConfig();
        NimbusServer instance = new NimbusServer();
        INimbus iNimbus = new DefaultInimbus();
        instance.launchServer(config, iNimbus);
    }
    

    服务器的相关设计我们在下一篇文档中介绍,现在主要是对服务器处理提交拓扑任务的处理进行分析。按照thrift的定义,我们在initThrift中找到了处理远程调用的类是ServiceHandler
    ServiceHandler的submitTopologyWithOpts方法中,提交topology。

    ServiceHandler的主要逻辑包括:生成标准化的Topology实例并注册到zk上;将Topology需要的jar包、Topology实例和配置通过复制和序列化保存到本地路径上;创建Task并注册到zk上,task节点的内容为TaskInfo即任务对应的组件名称;包装一个TopologyEvent对象提交给TopologyAssign线程处理,由其完成任务的资源分配。

    • checkTopologyActive 检查topologyName是否存在,NimbusData中包含一个用于处理zookeeper的类StormZkClusterState,首先从/jstorm/topology里面获取激活状态的拓扑列表,遍历这个列表,如果节点名称包含topologyName,则获取节点内部数据,将其反序列化为StormBase类,如果name一致,说明已经存在。
    • NimbusData中保存了提交的总数量,对其加1;之后,构造topologyId,为topologyname-次序-时间戳。将NimbusData中jstorm的conf与提交的stormConf合并,由于配置被修改了,对提交的StormTopology对象重新构造为一个标准化后的StormTopology。
    • setupStormCode对Nimbus所在服务器创建路径${storm.local.dir}/nimbus/stormdist/{topologyId},将上传的jar包复制过来,文件名为stormjar.jar;将标准化后的StormTopology序列化到{storm.local.dir}/nimbus/{topologyId}/stormcode.ser文件中,将标准化后的StormTopology序列化到{storm.local.dir}/nimbus/{topologyId}/stormconf.ser文件中。
    • setupZkTaskInfo针对bolt和spout组件生成TaskInfo,首先在zk上创建/jstorm/taskbeats/{topoologyId},读取stormconf.ser文件,反序列化为stormConf;读取stormcode.ser文件,反序列化为StormTopology实例;然后为topology添加Acker,Acker实现了IBolt,用来跟踪消息处理情况并通知spout;再为topology中的所有组件添加一个名为__system的stream。最后遍历topology中的组件,根据设置的并发度,保存到taskToComponetId中,例如,spout并发度为2,那么会保存1->spout 2->spout。有了这个TreeMap,遍历并创建TaskInfo,在zk上创建/jstorm/tasks/{topoologyId}/{taskId},将TaskInfo序列化保存到节点里面。例如,spout会在创建两个节点,节点内容为TaskInfo。
    • 为topology分配任务,TopologyAssign是一个线程,会不断地从queue中获取TopologyAssignEvent对象并进行任务分配,所以在提交提交任务的时候,只需要创建TopologyAssignEvent对象并push到queue中。而实际的任务分配则是在TopologyAssign中完成。在NimbusServer启动时的initTopologyAssign方法中,会启动TopologyAssign线程。

    2.4 TopologyAssign

    TopologyAssign线程的主要工作就是处理Topology的任务分配,主要逻辑在doTopologyAssignment方法中。流程是先根据TopologyAssignEvent生成Assignment,然后将Assignment备份到zk中。在分析之前,需要先对下面几个对象有所了解

    TopologyAssignContext

    在处理分配拓扑之前,我们已有的资源是Topology实例和配置信息。TopologyAssignContext的目的是将分配拓扑所需的数据都维护起来。TopologyAssignContext内部保存了拓扑分配的相关上下文:

    • assignType - 分配的类型,包括ASSIGN_TYPE_NEW-新建,ASSIGN_TYPE_REBALANCE-重新平衡,ASSIGN_TYPE_MONITOR-监听。
    • rawTopology - 未被修改前的StormTopology实例;
    • stormConf - 拓扑的配置;
    • oldAssignment - 拓扑已经存在的Assignment实例;
    • SupervisorInfo - 当前supervisor运行信息;
    • taskToComponent - task与组件名称的映射;
    • allTaskIds - 所有的task;
    • deadTaskIds - 已经不运行的task;
    • unstoppedTaskIds - 虽然活着但所在supervisor停止运行的task
    • cluster - storm集群中的SupervisorInfo

    DefaultTopologyAssignContext

    DefaultTopologyAssignContext继承了TopologyAssignContext类,

    • sysTopology - Storm会对rawTopology添加ack等机制,生成新的Topology实例
    • sidToHostname - supervisorid -> hostname的映射
    • hostToSid - hostname -> supervisorid的映射
    • oldWorkerTasks - zk中记录了Topology旧的Assignment,内部包含task的列表
    • componentTasks - 组件名称 -> taskid的映射
    • unstoppedAssignments - 已经停止的supervisor中的task
    • totalWorkerNum - worker总数,默认大小为min({topologt.workers},所有组件的并发数之和)
    • unstoppedWorkerNum - 已经停止的supervisor中的task数量
    • DISK_WEIGHT - 磁盘的权重
    • CPU_WEIGHT - CPU的权重
    • MEM_WEIGHT - 内存的权重
    • PORT_WEIGHT - 端口的权重
    • TASK_ON_DIFFERENT_NODE_WEIGHT - task运行在不同节点上的权重
    • USE_OLD_ASSIGN_RATIO_WEIGHT - 使用旧的分配率的权重
    • USER_DEFINE_ASSIGN_RATIO_WEIGHT - 使用用户定义的分配率的权重
    • DEFAULT_WEIGHT - 默认的权重

    DefaultTopologyScheduler
    DefaultTopologyScheduler实现了IToplogyScheduler接口,IToplogyScheduler只有一个方法

    Map<Integer, ResourceAssignment> assignTasks(TopologyAssignContext contex) 
                throws FailedAssignTopologyException;
    

    也就是说DefaultTopologyScheduler的主要功能就是根据TopologyAssignContext为task分配资源,如指定所在的supervisor,并在Supervisor的资源池中为task分配所需的资源。assignTasks方法是处理拓扑的核心功能。

    ResourceAssignment
    每个task会被分配给一个ResourceAssignment实例,ResourceAssignment包含了task所在supervisorId和磁盘,cpu,内存这些资源所在的slot,以及端口port。

    • supervisorId - supervisor的id
    • hostname - 所在的机器
    • diskSlot - 磁盘的slot
    • cpuSlotNum - cpu的slot数量
    • memSlotNum - 内存的slot数量
    • port - 端口

    Assignment

    每个Topology都需要被nimbus分配给supervisor执行,分配的元数据就保存在Assignment对象中。通过zk的节点/jstorm/assignments/{topologyid}传递给Supervisor,Assignment内部包括:

    • nodeHost - 保存{supervisorid: hostname} -- 分配的supervisor
    • taskStartTimeSecs - 每个task的启动时间,{taskid, taskStartSeconds
    • masterCodeDir - topology的代码路径
    • taskToResource - 每个task对应的ResourceAssignment

    处理流程

    mkAssignment的prepareTopologyAssign

    prepareTopologyAssign的主要作用是收集jstorm集群当前的上下文数据,包括:拓扑名称、标准化后的Topology、Topology配置、Supervisor列表、task列表、已经存在的Assignment。

    • 从本地路径读取topology文件,然后反序列化为StormTopology,保存到rawTopology中;再组装topology的stormConf;
    • 然后,从zk上获取所有Supervisor,如果没有Supervisor存在,会抛出异常。
    • 从zk上获取task列表,保存到TaskToComponent和AllTaskIds中;有了拓扑的task列表之后,从zk中获取拓扑的Assignment信息。如果已经存在了,则会根据task的心跳和Supervisor的运行状态,区分出还在运行中的和已经停止运行的task,分别保存到aliveTasks和unstoppedTasks中。具体逻辑是:从/jstorm/taskbeats/{topoologyId}节点下查找拓扑中包含的task记录到allTaskIds中;通过保持心跳的task记录,可以将aliveTasks从allTaskIds过滤出来;由于Supervisor可能已经停止运行,所以还需要找出来其包含的task,具体是通过zk中旧的Assignment中维护了task和其对应的Supervisor(getTaskToResource的ResourceAssignment中),如果Supervisor已经不在运行了,就将这些task保存到unstoppedTasks中。有了这些信息,deadTasks就是allTaskId减去aliveTasks的集合。
    • 如果zk上没有拓扑对应的旧Assignment信息,就不需要找出deadTasks和unstoppedTasks,这两个集合都为空了。
    • 为Supervisor收集未使用的slot,zk上记录了所有的Supervisor和Assignment,而Assignment中记录了使用的资源,这样就可以分配已使用的资源,剩余的就是未使用的资源。首先遍历zk上注册的所有的Assignment,由于Assignment中包含了task与ResourceAssignment的映射,而ResourceAssignment中包含了task所在的Supervisor的id,因此可以获取到每个task的SupervisorInfo,SupervisorInfo中维护了Supervisor的资源池(cpu,内存,磁盘和网络),在池中分配task所需的资源。
    • 设置拓扑已有的Assignment,保存到OldAssignment中,如果拓扑的Assignment不存在,会从zk中获取AssignmentBak。在这个过程中,也会设置AssignType的类型。

    mkAssignment的IToplogyScheduler

    收集完TopologyContext之后,就可以使用调度器来为任务分配资源了。IToplogyScheduler是默认的用来调度拓扑的类,在init方法中被创建,使用的是DefaultTopologyScheduler类。这个类的作用是为task分配指定的资源。

    分配过程为:

    1. 检查分配类型是否为ASSIGN_TYPE_NEW、ASSIGN_TYPE_REBALANCE、ASSIGN_TYPE_MONITOR三者之一;
    2. 在TopologyAssignContext的基础上创建DefaultTopologyAssignContext,内部包含了分配task资源需要的weight配置
    3. 如果是REBALANCE类型的,需要先释放掉已有的资源。context中保存了所有的SupervisorInfo,zk中记录了原有的Assignment,内部包含了task使用资源的情况,这样就可以对SupervisorInfo内的资源池进行释放。
    4. 统计需要分配的task,保存在needAssignTasks中。ASSIGN_TYPE_NEW类型的时候所有都需要分配;ASSIGN_TYPE_REBALANCE 需要分配(所有task 减去 已经停止的supervisor的task);ASSIGN_TYPE_MONITOR需要分配所有dead的task。
    5. 由于当前zk中可能有Assignment,其对应的task需要保持不变,所以,将这些task查找出来保存在keepAssigns中,然后获取需要保持的任务对应的<supervisorid和port> - > task,即每个WorkerSlot上对应的task。
    6. 计算 需要分配的worker数量 = 所有的worker总数量 - 已停止的supervisor的worker数量 - 需要保持的worker数量
    7. registerPreAssignHandler方法计算需要分配的任务needAssignTasks中每个组件分配类型ComponentAssignType对应的task。由于task中记录了组件名称,每个组件的配置可能会不同。ComponentAssignType共有三种类型的分配方式:USER_DEFINE, USE_OLD, NORMAL。每种方式都可能会对应着我们需要分配的若干task。
    8. 有了ComponentAssignType之后,就可以为task分配资源了。使用USER_DEFINE时,使用UserDefinePreAssign需要从配置文件中读取需要分配的cpu,内存和磁盘的solt。使用NORMAL时,对应的NormalPreAssign类。先查找是否有用户自定义的分配方式,如果没有,从配置中获取task占用的slot数量。cpu.slots.per.task:每个task会使用的cup slot数量,memory.slots.per.task每个task会使用的内存slot数量,task.alloc.disk.slot每个task会使用的磁盘slot数量。有了slot数量,还需要注意task.on.differ.node是否要求task位于不同的node上。满足slot的Supervisor很多,这时候就会使用总权重或level来为task选择最佳的Supervisor,并在Supervisor的资源池中分配相应的资源,并封装为ResourceAssignment对象返回。
    9. 分配完成后,执行PostAssignTaskPort的postAssign方法,为task分配worker需要的port。

    完成上述两个步骤之后,已经为Topology分配了所需的资源,包括Supervisor节点,worker,内存,磁盘,cpu的数量。

    mkAssignment的后续操作

    1. updateGroupResource 如果使用了Group模式,就需要设置分组数据。
    2. 计算supervisor与host的映射:zk中记录了所有的supervisor,过滤出task需要的supervisorid与host的映射关系即可。
    3. 计算task的starttime:ASSIGN_TYPE_NEW类型为当前时间
    4. 按照Assignment的构造方法,封装为Assignment对象
    5. 将Assignment的数据写入zk的/jstorm/assignments/{topologyId}节点里面
    6. 更新task的心跳起始时间

    setTopologyStatus

    激活Topology:/jstorm/topology/{topologyId}读取并反序列化为StormBase对象的实例。如果不存在,就创建这个节点,并创建StormBase实例保存起来。如果StormBase存在,就更新状态。

    backupAssignment

    主要的操作是将Assignment保存在zk上面。由于上面的过程中已经生产了Assignment对象,遍历task并构造出组件与task的映射关系后,封装为AssignmentBak,保存到/jstorm/assignments_bak/{topologyName}节点的内容中。

    相关文章

      网友评论

          本文标题:JStorm源码分析-3.提交Topology

          本文链接:https://www.haomeiwen.com/subject/wxdopqtx.html