美文网首页大数据
Storm介绍之安装部署及API

Storm介绍之安装部署及API

作者: Bloo_m | 来源:发表于2016-11-15 15:49 被阅读0次

安装:
1.下载并解压缩Zookeeper

           官网地址:http://hadoop.apache.org/zookeeper/releases.html
           解压缩步骤此处省略....

2.修改zookeeper的配置文件

          把zookeeper对应的conf目录下的zoo-sample.cfg重命名为zoo.cfg,配置工作目录和端口号
         # The number of milliseconds of each 
              ticktickTime=2000
         # The number of ticks that the initial
         # synchronization phase can take
            initLimit=10
         # The number of ticks that can pass between# sending a request and getting an acknowledgement
          syncLimit=5
         # the directory where the snapshot is stored.
         # do not use /tmp for storage, /tmp here is just
         # example sakes.
            dataDir=/usr/local/dev/zookeeper-3.4.9/data
         # the port at which the clients will connect
            clientPort=2181
            server.id_num1=hostname1:2888:3888
            server.id_num2=hostname2:2888:3888
     注意:集群模式下分别在datadir目录下创建文件myid,其中的内容为id_num,例如:
            echo id_num1 > myid

3.下载并压缩storm

          官网地址:http://storm.apache.org/downloads.html
          解压缩步骤此处省略.....

4.修改storm配置文件

          修改conf/storm.yaml,conf/storm.yaml中的配置选项将覆盖conf/defaults.yaml
        1):storm.zookeeper.servers:storm集群中使用的zookeeper集群的地址
            storm.zookeeper.servers:
                -"host_ip"    (此处填写zookeeper集群的主机名或Ip,多个用逗号分隔)
            nimbus.host:"host_ip"  (此处填下nimbus进程的主机,即主节点ip)
            storm.local.dir:"/dest/to/path"  (此处填写storm存储目录)
            supervisor.slots.ports:
                -  "host_ip:port"    (此处填写从节点的端口号,可随意,当采用单机模式的时候,需要写不同的端口号,具体的个数根据从节点的个数来定)

5.启动zookeeper

注意:集群中的每台机器都要启动,且启动命令一致
  进入zookeeper的安装目录
  #bin/zkServer.sh start

6.启动storm nimbus

进入storm安装目录
 #bin/storm nimbus

7.启动storm supervisor

  注意:如果是单机模式,即启动一次即可,如果是集群模式,需要每台都要启动,命令一致    
    进入storm安装目录
     #bin/storm supervisor

8.启动storm ui

进入storm安装目录
#bin/storm ui

然后访问localhost:8080(或者主节点主机名:8080)就会看到storm的基本信息,到此,storm的安装部署已经成功

接下来进入storm的API,首先先要了解storm中的顶层接口IComponent
storm中Spout和Bolt都是其Component(部件的意思),所以storm定义了一个名叫IComponent的接口,全家普如下:

Paste_Image.png

注意:

绿色部分是我们常用的类,红色部分是与事务有关的

BaseComponent是Storm提供的"偷懒"的类,它及其子类,或多或少实现了接口的部分方法,这样我们在使用的时候,不用自己每次都写所有的方法,值得一提的是:像BaseXXX的类,它所实现的方法,都是空的,直接返回null,如果继承这样的类,需要自己重写方法。下面介绍Spout和Bolt组件相关的Api

Spout
首先看一下总体图:

Paste_Image.png

从图中很明显的看出Spout最顶层抽象的是ISpout接口,简单介绍一样接口中的方法:

Paste_Image.png

open():初始化动作,可以在该Spout初始化的时候做一些动作,传递上下文等

close():该Spout关闭之前执行,但不能得到保证一定可以执行.Spout是作为Task运行在Worker中的,在Cluster模式下,supervisor会直接kill -9 worker的进程,这样它就无法执行了.而在本地模式下,如果是发送停止命令,是可以保证close方法的执行的.

activate()和deactivate():一个Spout可以被暂时激活和关闭,这两个方法可以在对应的时刻调用执行

nextTuple():用来发射数据,Spout中最核心的部分,一些具体的需求可以在该方法中实现

ack():一个Tuple会有唯一一个id,当该Tuple被成功处理,会执行该方法

fail():与ack()方法同理,当Tuple处理失败会调用该方法

总结:

通常情况下(shell和事务除外),实现一个Spout,可以直接实现IRichSpout,如果不想写多余的代码,可以继承BaseRichSpout

Bolt
同样,首先看下总体图:

Paste_Image.png

可以看出为什么IBasicBolt没有继承IBolt?
我们先看下IBolt的方法:

Paste_Image.png

我们需要知道的是IBolt继承了java.io.Serializable,我们在nimbus上提交了Topology后,创建出来的Bolt会序列化发送到具体执行的Worker上,Worker在执行该Bolt时,会首先调用prepare方法传入当前执行的上下文

execute(Tuple):接收一个Tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail方法(表示失败)来反馈结果

cleanup():同Ispout的close方法,不能保证其一定被执行

好了,现在可以回答为什么IBasicBolt没有继承IBolt这个问题了,Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中反馈结果了,storm内部会自动反馈结果

总结:

通常情况下实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理反馈结果,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上是自己做掉了prepare方法和collector.emit.ack(inputTuple)方法.

OK,介绍完了简单的方法,下面写一个简单的Demo,加深一下对Spout和Bolt的理解

简单需求:对名称加后缀并转换成大写

RandomWordSpout.java

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

          private SpoutOutputCollector  collector;
          
          //模拟一些数据
          String[] str = {"hello","word","you","how","are"};
          

        //初始化方法,在spout组件实例化时调用一次
          @Override
          public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                  this.collector = collector;
          }

          @Override
          public void nextTuple{
                 //随机挑选出一个名称
                  Random random = new Random();
                  int index = random.nextInt(str.length);
                
                  //获取名称
                  String name = str[index];

                  //将名称进行封装成tuple,发送消息给下一个组件
                  collector.emit(new Vaules(name));
            }

        //声明本spout组件发送出去的tuple中的数据的字段名
           @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
                  declarer.declare(new Fields("orignname"));
              }
}

UpperBolt.java

      import backtype.storm.topology.BasicOutputCollector;
      import backtype.storm.topology.OutputFieldsDeclarer;
      import backtype.storm.topology.base.BaseBasicBolt;
      import backtype.storm.tuple.Fields;
      import backtype.storm.tuple.Tuple;
      import backtype.storm.tuple.Values;
    
      public class UpperBolt extends BaseBasicBolt{

              //业务处理逻辑
               @Override
              public void execute(Tuple tuple, BasicOutputCollector collector) {
                      //先获取到上一个组件传递过来的数据,数据在tuple里面
                      String godName = tuple.getString(0);
    
                       //将名称转换成大写
                      String godName_upper = godName.toUpperCase();
    
                      //将转换完成的商品名发送出去
                      collector.emit(new Values(godName_upper));
            }



             //声明该bolt组件要发出去的tuple的字段
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                    declarer.declare(new Fields("uppername"));
              }
    }

SuffixBolt.java

    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;

    public class SuffixBolt extends BaseBasicBolt{
          FileWriter fileWriter = null;
        
          //在bolt组件运行过程中只会被调用一次
            @Override
          public void prepare(Map stormConf, TopologyContext context) {  
                  try {
                      fileWriter = new   FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
                      } catch (IOException e) {
                              throw new RuntimeException(e);
                      }
}

        //该bolt组件的核心处理逻辑
        //每收到一个tuple消息,就会被调用一次
            @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            //先拿到上一个组件发送过来的名称
              String upper_name = tuple.getString(0);
              String suffix_name = upper_name + "_itisok";

          //为上一个组件发送过来的商品名称添加后缀
              try {
                  fileWriter.write(suffix_name);
                  fileWriter.write("\n");
                  fileWriter.flush();
              } catch (IOException e) {
                  throw new RuntimeException(e);
              }
        }
}

TopoMain.java

    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;

  /**
   * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
 * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
   */
    public class TopoMain {
        public static void main(String[] args) throws Exception {   
              TopologyBuilder builder = new TopologyBuilder();

            //将我们的spout组件设置到topology中去 
            //parallelism_hint :4  表示用4个excutor来执行这个组件
            //setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task

              builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
    
          //将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
          //.shuffleGrouping("randomspout")包含两层含义:
          //1、upperbolt组件接收的tuple消息一定来自于randomspout组件
          //2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping

              builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
    
          //将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息

              builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
    
          //用builder来创建一个topology
               StormTopology demotop = builder.createTopology();

          //配置一些topology在集群中运行时的参数
               Config conf = new Config();
         //这里设置的是整个demotop所占用的槽位数,也就是worker的数量
               conf.setNumWorkers(4);
              conf.setDebug(true);
              conf.setNumAckers(0);
    
        //将这个topology提交给storm集群运行
             StormSubmitter.submitTopology("demotopo", conf, demotop);  
}

}

最后将工程打包,在集群上运行
#storm jar jar_name.jar class_name args0 ....

相关文章

网友评论

    本文标题:Storm介绍之安装部署及API

    本文链接:https://www.haomeiwen.com/subject/odckpttx.html