美文网首页
Storm架构与编程模型分析

Storm架构与编程模型分析

作者: 小猪Harry | 来源:发表于2018-10-08 00:18 被阅读0次

    这里使用[wordcount]程序来进行分析,其中主类如下

    public class WordCountTopologyDriver {
    
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    
            //1、创建topologyBuilder,设置spout和bolt
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            //设置spout   传参:id,使用的Spout类,并发度
            topologyBuilder.setSpout("myspout",new MySpout(),2);
    
            //设置Bolt    传参:id,使用的Bolt类,并发度
            //设置分组策略    随机分 参数为spout的id
            //mybolt1与myspout跟进id进行连接,怎么连接?取决于分组策略,shuffleGrouping会对myspout进行分组
            //五个task(也就是五个executor或者说五个线程)
            topologyBuilder.setBolt("mybolt1",new SplitBolt(),2).shuffleGrouping("myspout");
            //设置分组策略    按字段分 参数为上一阶段的bolt的id
            //注:如果字段与mybolt里面声明的不一致会出现backtype.storm.generated.InvalidTopologyException: null
            topologyBuilder.setBolt("mybolt2",new CountBolt(),4).fieldsGrouping("mybolt1",new Fields("word"));
    
            //2、创建Config,指定分配的worker的数量
            Config config = new Config();
            config.setNumWorkers(2);
    
            //提交任务,可以使用storm集群来提交也可以使用本地模式来提交(便于调试)
    //        StormSubmitter.submitTopology("wordcountsubmit",config,topologyBuilder.createTopology());
            //使用本地模式提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcountsubmit",config,topologyBuilder.createTopology());
        }
    }
    
    

    设置了myspout的并发度为2,mybolt1的并发度为4,mybolt2的并发度为2,worker的数量为2。
    分析图

    image.png

    流程分析:
    这里有4台物理机,storm1-4分别对应nimbus和3个supervisor。
    1、当客户端提交任务时,nimbus接收到任务,开始做任务分配,它会找到空闲的supervisor比如找的是storm2和storm3两台机器,并进行任务分配;
    2、根据前面的介绍,这里一个需要8个task,而有2个worker,那么每个worker就分配了4个task。worker1和worker2各分配一个mySpout和splitBolt,从自己跑的wordcount程序能看到多个task在worker上分配一般是轮询的,所以可以认为worker1上分配了myBolt2的0和2task,worker2上分配了myBolt2的1和3task(或者反过来)。
    3、任务分配好了选择了对应的端口(worker),找出了Supervisor。那么将通过zookeeper将任务分发下去,让2个Supervisor开始跑程序。
    4、两个mySpout获取数据源,将数据随机发送到自己worker上的bolt1和另外一台的worker上的bolt(因为设置了随机发射数据shuffleGrouping)。
    5、两台机器上的bolt1接着会向bolt2发送数据,发送规则为为指定字段(单词)hash取模比bolt2的数量(这样保证相同的单词肯定会发送到某个bolt2中,而不会很多bolt2中有相同的单词,如果是这样某个单词的统计就不完整了最后还需要统一去统计,所以必须用fieldsGrouping策略发射数据)。

    相关文章

      网友评论

          本文标题:Storm架构与编程模型分析

          本文链接:https://www.haomeiwen.com/subject/mrtdaftx.html