美文网首页
Storm基础开发案例-数字累加

Storm基础开发案例-数字累加

作者: 学术界末流打工人 | 来源:发表于2020-02-03 18:54 被阅读0次

需求

使用Storm实现累加求和的操作

开发过程

1. Spout 定义

首先 Spout需要继承BaseRichSpout

public class LocalSumStormTopology {
  public static class DataSourceSpout extends BaseRichSpout{
  }
}

接下来需要实现 BasheRichSpout的三个方法

  • open
  • nextTuple
  • declareOutputFields

open方法实现

// 因为数据需要发出去所以两个地方需要用,在这里初始化一个
private SpoutOutputCollector collector;

/**
  * 初始化只会被调用一次
  * @param map 配置参数
  * @param topologyContext 上下文
  * @param spoutOutputCollector 数据发射器
*/
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  this.collector = collector;
}

nextTuple方法实现

int number = 0;
/**
  * 产生数据,在生产上肯定从消息队列中获取数据
  *
  * 这个方法是个死循环,一直不停的执行
  */
public void nextTuple() {
  this.collector.emit(new Values(number++));

  System.out.println("Spout:"+number);

  // 防止数据产生太快
  Utils.sleep(1000);
}

declareOutputFields方法实现

/**  
  * 声明输出字段
  * @param outputFieldsDeclarer
  */
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  // num字段是对应nextTuple中的 Values值得数量
  outputFieldsDeclarer.declare(new Fields("num"));
}

2. Bolt 定义

需要继承BaseRichBolt类

public static class SumBolt extends BaseRichBolt{
}

接下来需要实现 BaseRichBolt的三个方法

  • prepare
  • execute
  • declareOutputFields

prepare 方法实现

 /**
  * 初始化方法,会被执行一次
  * @param map
  * @param topologyContext
  * @param outputCollector
*/
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
  // 本场景不需要任何实际内容
}

execute 方法实现

int sum = 0;
/**
  * 其实也是一个死循环,职责:获取Spout发送过来的数据
  * @param tuple
*/
public void execute(Tuple tuple) {
  // 获取发送过来的字段
  //Bolt中获取值可以根据index获取,也可以根据上一个环节中
  //定义的field的名称获取(建议使用该方式)
  Integer val = tuple.getIntegerByField("num");
  sum +=val;

  System.out.println("Bolt: sum = ["+sum+"]");
}

declareOutputFields 方法实现

/**
  * 目前需求不需要往下继续发,所以暂且为空
  * @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}

测试

/**
  * 测试
*/
public static void main(String[] args) {
  // TopologyBuilder根据Spout和Bolt来构建出Topology
  // Storm中任何一个作业都是通过Topology的方式提交的
  // Topology中需要指定Spout和Bolt的执行顺序
  TopologyBuilder builder = new TopologyBuilder();
  
  //关联 Spout  Bolt
  builder.setSpout("DataSourceSpout",new DataSourceSpout());
  // shuffleGrouping方法需要穿componentId,指的是去哪里拿数据,就指定哪个Id,这里是去DataSourceSpout拿
  builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");


  // 创建一个本地Storm集群:本地模式运行,不需要搭建Storm集群
  LocalCluster cluster = new LocalCluster();

  cluster.submitTopology("LocalSumStormTopology",new Config(), builder.createTopology());
}

整体代码

package com.imooc.bigdata;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 1. 使用Storm实现积累求和的操作
 */
public class LocalSumStormTopology {

    /**
    1.Spout 需要继承BaseRichSpout
    数据源需要产生数据并发射
     */

    public static class DataSourceSpout extends BaseRichSpout{

        // 因为数据需要发出去所以两个地方需要用,在这里初始化一个
        private SpoutOutputCollector collector;
        /**
         * 1.1
         * 初始化只会被调用一次
         * @param map 配置参数
         * @param topologyContext 上下文
         * @param spoutOutputCollector 数据发射器
         */
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.collector = collector;
        }

        int number = 0;
        /**
         * 1.2
         * 产生数据,在生产上肯定从消息队列中获取数据
         *
         * 这个方法是个死循环,一直不停的执行
         */
        public void nextTuple() {
            this.collector.emit(new Values(number++));

            System.out.println("Spout:"+number);

            // 防止数据产生太快
            Utils.sleep(1000);
        }

        /**
         * 1.3
         * 声明输出字段
         * @param outputFieldsDeclarer
         */
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            // num字段是对应nextTuple中的 Values值得数量
            outputFieldsDeclarer.declare(new Fields("num"));
        }
    }

    /**
     * 2
     * 数据的累计求和Bolt:接收数据并处理
     */
    public static class SumBolt extends BaseRichBolt{

        /**
         * 初始化方法,会被执行一次
         * @param map
         * @param topologyContext
         * @param outputCollector
         */
        public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
            // 本场景不需要任何实际内容
        }

        int sum = 0;
        /**
         * 其实也是一个死循环,职责:获取Spout发送过来的数据
         * @param tuple
         */
        public void execute(Tuple tuple) {
            // 获取发送过来的字段
            //Bolt中获取值可以根据index获取,也可以根据上一个环节中
            //定义的field的名称获取(建议使用该方式)
            Integer val = tuple.getIntegerByField("num");
            sum +=val;

            System.out.println("Bolt: sum = ["+sum+"]");
        }

        /**
         * 目前需求不需要往下继续发,所以暂且为空
         * @param outputFieldsDeclarer
         */
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }
    }

    /**
     * 测试
     */
    public static void main(String[] args) {
        // TopologyBuilder根据Spout和Bolt来构建出Topology
        // Storm中任何一个作业都是通过Topology的方式提交的
        // Topology中需要指定Spout和Bolt的执行顺序
        TopologyBuilder builder = new TopologyBuilder();
        //关联 Spout  Bolt
        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        // shuffleGrouping方法需要穿componentId,指的是去哪里拿数据,就指定哪个Id,这里是去DataSourceSpout拿
        builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");


        // 创建一个本地Storm集群:本地模式运行,不需要搭建Storm集群
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalSumStormTopology",new Config(), builder.createTopology());

    }
}


References

  1. 基于Storm构建实时热力分布项目实战

相关文章

网友评论

      本文标题:Storm基础开发案例-数字累加

      本文链接:https://www.haomeiwen.com/subject/rhkkxhtx.html