美文网首页大数据大数据大数据-简书
流式计算storm核心组件、特性、案例

流式计算storm核心组件、特性、案例

作者: 程序员1 | 来源:发表于2018-09-11 13:08 被阅读9次

    storm简介

    Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

    storm核心组件

    1.Nimbus

    相当于storm的master,负责资源分配和任务调度,一个普通的storm集群只有一个nimbus(京东是对nimbus做了集群,加入了选举等概念,防止nimbus突然挂掉)

    2.Supervisor

    相当于storm的slave,负责接收Nimbus分配的任务,管理和启动所有的Worker

    3.Worker

    一个Worker就是一个jvm进程,对应一个Topology程序,可以有多个Executor

    4.Executor

    一个Executor就是一个线程,默认对应一个task,也可以设置成对应多个task

    5.Task

    一个Task是一个实例(spot/bolt),有多少个task就会new多少个bolt,task是storm中进行计算的最小的运行单位

    6.Topology

    拓扑结构,一个计算任务场景对应一个拓扑结构,拓扑结构中对声明spout和bolt直接的关系

    7.Spout

    是拓扑结构中的数据来源,可以向多个bolt发送数据,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源

    8.Bolt

    真正的数据处理部分,一个bolt可以发给多个bolt,多个bolt也可以发给一个bolt

    9.Component

    Spout和Bolt都是Component,Storm定义了一个名叫IComponent的总接口

    全家谱如下:绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的

    spout和bolt的关系:

    整体的topology结构:

    storm使用zookeeper来协调集群中的多个节点,但并不是用zookeeper来传递消息

    zookeeper可以看这个

    Nimbus和Supervisor都是无状态的,他们的心跳都由zookeeper协调

    storm优点

    1.使用简单,容易上手

    2.可扩展,可以调整正在运行的topologies的并行度

    3.容错,可靠,当工作节点宕了,storm会尝试重启另一个,而且Nimbus和Supervisors都是无状态的,死掉重启都不影响

    4.无数据丢失,Storm的抽象组件确保了数据至少处理一次,即使使用消息队列系统失败时,也能确保消息被处理

    5.支持多种编程语言,Storm用Thrift定义和提交topologies.由于Thrift能被任何一种编程语言使用,因此,topologies也能被任何一种编程语言定义和使用。

    6.容易部署和操作

    7.高性能,低延迟

    storm入门案例 ( 实时统计单次个数 )

    首先导入maven依赖

    1

    2 org.apache.storm

    3 storm-core

    4 1.0.4

    5

    1.先写一个Spout,确定数据源,实际应用中一般是接入kafka等消息,入门案例使用随机字符串代替

    /**

    * 向后端发射tuple数据流

    * @author soul

    *

    */

    public class SentenceSpout extends BaseRichSpout {

    //BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现

    private SpoutOutputCollector collector;

    private String[] sentences = {

    "my name is soul",

    "im a boy",

    "i have a dog",

    "my dog has fleas",

    "my girl friend is beautiful"

    };

    private int index=0;

    /**

    * open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。

    * open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象,提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。

    * 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。

    */

    @Override

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

    // TODO Auto-generated method stub

    this.collector = collector;

    }

    /**

    * nextTuple()方法是任何Spout实现的核心。

    * Storm调用这个方法,向输出的collector发出tuple。

    * 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。

    */

    @Override

    public void nextTuple() {

    //collector.emit(new Values("hello world this is a test"));

    // TODO Auto-generated method stub

    this.collector.emit(new Values(sentences[index]));

    index++;

    if (index>=sentences.length) {

    index=0;

    }

    Utils.sleep(1000);

    }

    /**

    * declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口

    * 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段

    */

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    // TODO Auto-generated method stub

    declarer.declare(new Fields("sentence"));//告诉组件发出数据流包含sentence字段

    }

    }

    2.写第一个bolt,将Spout传过来的Tuple拆成一个个的单次,循环发给下一个bolt

    /**

    * 订阅sentence spout发射的tuple流,实现分割单词

    * @author soul

    *

    */

    public class SplitSentenceBolt extends BaseRichBolt {

    //BaseRichBolt是IComponent和IBolt接口的实现

    //继承这个类,就不用去实现本例不关心的方法

    private OutputCollector collector;

    /**

    * prepare()方法类似于ISpout 的open()方法。

    * 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。

    * 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,

    * 所以prepare()方法只保存OutputCollector对象的引用。

    */

    @Override

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    // TODO Auto-generated method stub

    this.collector=collector;

    }

    /**

    * SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。

    * 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

    * 本例中,收到的元组中查找“sentence”的值,

    * 并将该值拆分成单个的词,然后按单词发出新的tuple。

    */

    @Override

    public void execute(Tuple input) {

    // TODO Auto-generated method stub

    String sentence = input.getStringByField("sentence");

    String[] words = sentence.split(" ");

    for (String word : words) {

    this.collector.emit(new Values(word));//向下一个bolt发射数据

    }

    }

    /**

    * plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。

    */

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    // TODO Auto-generated method stub

    declarer.declare(new Fields("word"));

    }

    }

    3.再写一个bolt,一方面接收上个bolt传过来的单次,另一方面将相同单次出现的次数记录下来,并将现在的结果传给下个bolt

    /**

    * 订阅 split sentence bolt的输出流,实现单词计数,并发送当前计数给下一个bolt

    * @author soul

    *

    */

    public class WordCountBolt extends BaseRichBolt {

    private OutputCollector collector;

    //存储单词和对应的计数

    private HashMap counts = null;//注:不可序列化对象需在prepare中实例化

    /**

    * 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的

    * 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。

    * 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)

    * 会抛出NotSerializableException并且拓扑将无法发布。

    * 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。

    * 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化

    * 而在prepare()方法中对不可序列化的对象进行实例化。

    */

    @Override

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    // TODO Auto-generated method stub

    this.collector = collector;

    this.counts = new HashMap();

    }

    /**

    * 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)

    * 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。

    * 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。

    */

    @Override

    public void execute(Tuple input) {

    // TODO Auto-generated method stub

    String word = input.getStringByField("word");

    Long count = this.counts.get(word);

    if (count == null) {

    count = 0L;//如果不存在,初始化为0

    }

    count++;//增加计数

    this.counts.put(word, count);//存储计数

    this.collector.emit(new Values(word,count));

    }

    /**

    *

    */

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    // TODO Auto-generated method stub

    //声明一个输出流,其中tuple包括了单词和对应的计数,向后发射

    //其他bolt可以订阅这个数据流进一步处理

    declarer.declare(new Fields("word","count"));

    }

    }

    4.再写一个bolt,接收上个bolt传过来的单次统计结果,在控制台打印.实际最后一步一般会将数据结果存在非关系型数据库中,比如存入HBase或者Redis中

    /**

    * 生成一份报告

    * @author soul

    *

    */

    public class ReportBolt extends BaseRichBolt {

    private HashMap counts = null;//保存单词和对应的计数

    @Override

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    // TODO Auto-generated method stub

    this.counts = new HashMap();

    }

    @Override

    public void execute(Tuple input) {

    // TODO Auto-generated method stub

    String word = input.getStringByField("word");

    Long count = input.getLongByField("count");

    this.counts.put(word, count);

    //实时输出

    System.out.println("结果:"+this.counts);

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    // TODO Auto-generated method stub

    //这里是末端bolt,不需要发射数据流,这里无需定义

    }

    /**

    * cleanup是IBolt接口中定义

    * Storm在终止一个bolt之前会调用这个方法

    * 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果

    * 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接

    * 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。

    */

    @Override

    public void cleanup(){

    System.out.println("---------- FINAL COUNTS -----------");

    ArrayList keys = new ArrayList();

    keys.addAll(this.counts.keySet());

    Collections.sort(keys);

    for(String key : keys){

    System.out.println(key + " : " + this.counts.get(key));

    }

    System.out.println("----------------------------");

    }

    }

    5.写拓扑结构,将前面四步的Spout和Bolt组成一个拓扑结构,直接运行主方法就能看到结果,这个是Storm的本地模式,将提交的方法稍作修改,就可以变成集群模式,实际都是集群模式,将这些代码打成jar包传到Nimbus上,运行在集群中

    /**

    * 实现单词计数topology

    *

    */

    public class App

    {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";

    private static final String SPLIT_BOLT_ID = "split-bolt";

    private static final String COUNT_BOLT_ID = "count-bolt";

    private static final String REPORT_BOLT_ID = "report-bolt";

    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main( String[] args ) //throws Exception

    {

    //System.out.println( "Hello World!" );

    //实例化spout和bolt

    SentenceSpout spout = new SentenceSpout();

    SplitSentenceBolt splitBolt = new SplitSentenceBolt();

    WordCountBolt countBolt = new WordCountBolt();

    ReportBolt reportBolt = new ReportBolt();

    TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例

    //TopologyBuilder提供流式风格的API来定义topology组件之间的数据流

    //builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout

    //设置两个Executeor(线程),默认一个

    builder.setSpout(SENTENCE_SPOUT_ID, spout,2);

    // SentenceSpout --> SplitSentenceBolt

    //注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例

    //builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);

    //SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程)

    builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);

    // SplitSentenceBolt --> WordCountBolt

    //fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中

    //这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中

    //builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));

    //WordCountBolt单词计数器设置4个Executeor(线程)

    builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));

    // WordCountBolt --> ReportBolt

    //globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBolt

    builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

    Config config = new Config();//Config类是一个HashMap的子类,用来配置topology运行时的行为

    //设置worker数量

    //config.setNumWorkers(2);

    LocalCluster cluster = new LocalCluster();

    //本地提交

    cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

    Utils.sleep(10000);

    cluster.killTopology(TOPOLOGY_NAME);

    cluster.shutdown();

    }

    }

    storm与其他流式计算框架的对比

    1.Spark Streaming

    在处理前按时间间隔预先将其切分为一段一段的批处理作业.

    Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),

    一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集),

    而RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

    2.Flink

    针对流数据和批数据的分布式处理引擎

    原生的流处理系统,

    其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已

    Flink 会把所有任务当成流来处理

    3.Storm

    原生的流处理系统,可以做到毫秒级处理

    点击阅读评论转发加关注,后续还会有许多知识点,与大家分享,还可以私信我!

    没有你想不到的,只有你做不到的。

    相关文章

      网友评论

        本文标题:流式计算storm核心组件、特性、案例

        本文链接:https://www.haomeiwen.com/subject/mafugftx.html