美文网首页大数据
Storm介绍之安装部署及API

Storm介绍之安装部署及API

作者: Bloo_m | 来源:发表于2016-11-15 15:49 被阅读0次

    安装:
    1.下载并解压缩Zookeeper

               官网地址:http://hadoop.apache.org/zookeeper/releases.html
               解压缩步骤此处省略....
    

    2.修改zookeeper的配置文件

              把zookeeper对应的conf目录下的zoo-sample.cfg重命名为zoo.cfg,配置工作目录和端口号
             # The number of milliseconds of each 
                  ticktickTime=2000
             # The number of ticks that the initial
             # synchronization phase can take
                initLimit=10
             # The number of ticks that can pass between# sending a request and getting an acknowledgement
              syncLimit=5
             # the directory where the snapshot is stored.
             # do not use /tmp for storage, /tmp here is just
             # example sakes.
                dataDir=/usr/local/dev/zookeeper-3.4.9/data
             # the port at which the clients will connect
                clientPort=2181
                server.id_num1=hostname1:2888:3888
                server.id_num2=hostname2:2888:3888
         注意:集群模式下分别在datadir目录下创建文件myid,其中的内容为id_num,例如:
                echo id_num1 > myid
    

    3.下载并压缩storm

              官网地址:http://storm.apache.org/downloads.html
              解压缩步骤此处省略.....
    

    4.修改storm配置文件

              修改conf/storm.yaml,conf/storm.yaml中的配置选项将覆盖conf/defaults.yaml
            1):storm.zookeeper.servers:storm集群中使用的zookeeper集群的地址
                storm.zookeeper.servers:
                    -"host_ip"    (此处填写zookeeper集群的主机名或Ip,多个用逗号分隔)
                nimbus.host:"host_ip"  (此处填下nimbus进程的主机,即主节点ip)
                storm.local.dir:"/dest/to/path"  (此处填写storm存储目录)
                supervisor.slots.ports:
                    -  "host_ip:port"    (此处填写从节点的端口号,可随意,当采用单机模式的时候,需要写不同的端口号,具体的个数根据从节点的个数来定)
    

    5.启动zookeeper

    注意:集群中的每台机器都要启动,且启动命令一致
      进入zookeeper的安装目录
      #bin/zkServer.sh start
    

    6.启动storm nimbus

    进入storm安装目录
     #bin/storm nimbus
    

    7.启动storm supervisor

      注意:如果是单机模式,即启动一次即可,如果是集群模式,需要每台都要启动,命令一致    
        进入storm安装目录
         #bin/storm supervisor
    

    8.启动storm ui

    进入storm安装目录
    #bin/storm ui
    

    然后访问localhost:8080(或者主节点主机名:8080)就会看到storm的基本信息,到此,storm的安装部署已经成功

    接下来进入storm的API,首先先要了解storm中的顶层接口IComponent
    storm中Spout和Bolt都是其Component(部件的意思),所以storm定义了一个名叫IComponent的接口,全家普如下:

    Paste_Image.png

    注意:

    绿色部分是我们常用的类,红色部分是与事务有关的
    

    BaseComponent是Storm提供的"偷懒"的类,它及其子类,或多或少实现了接口的部分方法,这样我们在使用的时候,不用自己每次都写所有的方法,值得一提的是:像BaseXXX的类,它所实现的方法,都是空的,直接返回null,如果继承这样的类,需要自己重写方法。下面介绍Spout和Bolt组件相关的Api

    Spout
    首先看一下总体图:

    Paste_Image.png

    从图中很明显的看出Spout最顶层抽象的是ISpout接口,简单介绍一样接口中的方法:

    Paste_Image.png

    open():初始化动作,可以在该Spout初始化的时候做一些动作,传递上下文等

    close():该Spout关闭之前执行,但不能得到保证一定可以执行.Spout是作为Task运行在Worker中的,在Cluster模式下,supervisor会直接kill -9 worker的进程,这样它就无法执行了.而在本地模式下,如果是发送停止命令,是可以保证close方法的执行的.

    activate()和deactivate():一个Spout可以被暂时激活和关闭,这两个方法可以在对应的时刻调用执行

    nextTuple():用来发射数据,Spout中最核心的部分,一些具体的需求可以在该方法中实现

    ack():一个Tuple会有唯一一个id,当该Tuple被成功处理,会执行该方法

    fail():与ack()方法同理,当Tuple处理失败会调用该方法

    总结:

    通常情况下(shell和事务除外),实现一个Spout,可以直接实现IRichSpout,如果不想写多余的代码,可以继承BaseRichSpout
    

    Bolt
    同样,首先看下总体图:

    Paste_Image.png

    可以看出为什么IBasicBolt没有继承IBolt?
    我们先看下IBolt的方法:

    Paste_Image.png

    我们需要知道的是IBolt继承了java.io.Serializable,我们在nimbus上提交了Topology后,创建出来的Bolt会序列化发送到具体执行的Worker上,Worker在执行该Bolt时,会首先调用prepare方法传入当前执行的上下文

    execute(Tuple):接收一个Tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail方法(表示失败)来反馈结果

    cleanup():同Ispout的close方法,不能保证其一定被执行

    好了,现在可以回答为什么IBasicBolt没有继承IBolt这个问题了,Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中反馈结果了,storm内部会自动反馈结果

    总结:

    通常情况下实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理反馈结果,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上是自己做掉了prepare方法和collector.emit.ack(inputTuple)方法.
    

    OK,介绍完了简单的方法,下面写一个简单的Demo,加深一下对Spout和Bolt的理解

    简单需求:对名称加后缀并转换成大写

    RandomWordSpout.java

    import java.util.Map;
    import java.util.Random;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
    
    public class RandomWordSpout extends BaseRichSpout{
    
              private SpoutOutputCollector  collector;
              
              //模拟一些数据
              String[] str = {"hello","word","you","how","are"};
              
    
            //初始化方法,在spout组件实例化时调用一次
              @Override
              public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                      this.collector = collector;
              }
    
              @Override
              public void nextTuple{
                     //随机挑选出一个名称
                      Random random = new Random();
                      int index = random.nextInt(str.length);
                    
                      //获取名称
                      String name = str[index];
    
                      //将名称进行封装成tuple,发送消息给下一个组件
                      collector.emit(new Vaules(name));
                }
    
            //声明本spout组件发送出去的tuple中的数据的字段名
               @Override
              public void declareOutputFields(OutputFieldsDeclarer declarer) {
                      declarer.declare(new Fields("orignname"));
                  }
    }
    

    UpperBolt.java

          import backtype.storm.topology.BasicOutputCollector;
          import backtype.storm.topology.OutputFieldsDeclarer;
          import backtype.storm.topology.base.BaseBasicBolt;
          import backtype.storm.tuple.Fields;
          import backtype.storm.tuple.Tuple;
          import backtype.storm.tuple.Values;
        
          public class UpperBolt extends BaseBasicBolt{
    
                  //业务处理逻辑
                   @Override
                  public void execute(Tuple tuple, BasicOutputCollector collector) {
                          //先获取到上一个组件传递过来的数据,数据在tuple里面
                          String godName = tuple.getString(0);
        
                           //将名称转换成大写
                          String godName_upper = godName.toUpperCase();
        
                          //将转换完成的商品名发送出去
                          collector.emit(new Values(godName_upper));
                }
    
    
    
                 //声明该bolt组件要发出去的tuple的字段
                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        declarer.declare(new Fields("uppername"));
                  }
        }
    

    SuffixBolt.java

        import backtype.storm.task.TopologyContext;
        import backtype.storm.topology.BasicOutputCollector;
        import backtype.storm.topology.OutputFieldsDeclarer;
        import backtype.storm.topology.base.BaseBasicBolt;
        import backtype.storm.tuple.Tuple;
    
        public class SuffixBolt extends BaseBasicBolt{
              FileWriter fileWriter = null;
            
              //在bolt组件运行过程中只会被调用一次
                @Override
              public void prepare(Map stormConf, TopologyContext context) {  
                      try {
                          fileWriter = new   FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
                          } catch (IOException e) {
                                  throw new RuntimeException(e);
                          }
    }
    
            //该bolt组件的核心处理逻辑
            //每收到一个tuple消息,就会被调用一次
                @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                //先拿到上一个组件发送过来的名称
                  String upper_name = tuple.getString(0);
                  String suffix_name = upper_name + "_itisok";
    
              //为上一个组件发送过来的商品名称添加后缀
                  try {
                      fileWriter.write(suffix_name);
                      fileWriter.write("\n");
                      fileWriter.flush();
                  } catch (IOException e) {
                      throw new RuntimeException(e);
                  }
            }
    }
    

    TopoMain.java

        import backtype.storm.Config;
        import backtype.storm.StormSubmitter;
        import backtype.storm.generated.AlreadyAliveException;
        import backtype.storm.generated.InvalidTopologyException;
        import backtype.storm.generated.StormTopology;
        import backtype.storm.topology.TopologyBuilder;
    
      /**
       * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
     * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
       */
        public class TopoMain {
            public static void main(String[] args) throws Exception {   
                  TopologyBuilder builder = new TopologyBuilder();
    
                //将我们的spout组件设置到topology中去 
                //parallelism_hint :4  表示用4个excutor来执行这个组件
                //setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task
    
                  builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
        
              //将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
              //.shuffleGrouping("randomspout")包含两层含义:
              //1、upperbolt组件接收的tuple消息一定来自于randomspout组件
              //2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
    
                  builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
        
              //将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
    
                  builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
        
              //用builder来创建一个topology
                   StormTopology demotop = builder.createTopology();
    
              //配置一些topology在集群中运行时的参数
                   Config conf = new Config();
             //这里设置的是整个demotop所占用的槽位数,也就是worker的数量
                   conf.setNumWorkers(4);
                  conf.setDebug(true);
                  conf.setNumAckers(0);
        
            //将这个topology提交给storm集群运行
                 StormSubmitter.submitTopology("demotopo", conf, demotop);  
    }
    

    }

    最后将工程打包,在集群上运行
    #storm jar jar_name.jar class_name args0 ....

    相关文章

      网友评论

        本文标题:Storm介绍之安装部署及API

        本文链接:https://www.haomeiwen.com/subject/odckpttx.html