美文网首页
jstorm api

jstorm api

作者: lmem | 来源:发表于2017-04-14 12:15 被阅读81次
Map conf = new HashMp();
//topology所有自定义的配置均放入这个Map

TopologyBuilder builder = new TopologyBuilder();
//创建topology的生成器

int spoutParal = get("spout.parallel", 1);
//获取spout的并发设置

SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
                new SequenceSpout(), spoutParal);
//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格

int boltParal = get("bolt.parallel", 1);
//获取bolt的并发设置

BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 
//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,
//即每个spout随机轮询发送tuple到下一级bolt中

int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//设置表示acker的并发数

int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//表示整个topology将使用几个worker

conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行

StormSubmitter.submitTopology(streamName, conf,
                builder.createTopology());
//提交topology

IRichSpout
IRichSpout 为最简单的Spout接口

IRichSpout{

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    }

    @Override
    public void close() {
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    @Override
    public void nextTuple() {
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
open是当task起来后执行的初始化动作
close是当task被shutdown后执行的动作
activate是当task被激活时,触发的动作
deactivate 是task被deactive时,触发的动作
nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
declareOutputFields, 定义spout发送数据,每个字段的含义
getComponentConfiguration 获取本spout的component 配置

Bolt

IRichBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }

    @Override
    public void execute(Tuple input) {
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

其中注意:
bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
prepare 是当task起来后执行的初始化动作
cleanup 是当task被shutdown后执行的动作
execute 是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
declareOutputFields, 定义bolt发送数据,每个字段的含义
getComponentConfiguration 获取本bolt的component 配置

相关文章

  • jstorm api

    IRichSpoutIRichSpout 为最简单的Spout接口 spout对象必须是继承Serializabl...

  • JStorm

    JStorm JStorm官方网站 JStorm Chinese Documentation [github]

  • jstorm常用命令汇总

    1、运行jstorm项目如下我的jstorm项目打包为: jstorm-test-1.0-SNAPSHOT.jar...

  • JStorm和Storm比较

    1、What——JStorm是什么? 概述: JStorm 是一个分布式实时计算引擎,类似Hadoop MapRe...

  • JStorm学习笔记 - 基本概念

    JStorm 是一个分布式实时计算引擎。JStorm 是一个类似Hadoop MapReduce的系统, 用户按照...

  • Jstorm集群的搭建

    1、为什么选择jstorm而不是storm? 阿里Jstorm和storm的对比 简单说下安装步骤: 三台机器:分...

  • 分布式流式计算-jstorm部署

    jstorm是阿里巴巴使用java语言重写的storm,可以用来做流式计算,我们使用jstorm从kafka中读取...

  • JStorm:单词计数-开发示例

    JStorm:1、概念与编程模型JStorm:2、任务调度 转载自个人博客示例功能说明:统计单词出现的次数,spo...

  • JStorm源码分析-2.组装Topology

    我们在使用jstorm的时候,主要的工作就是将我们的数据处理逻辑构造为Topology,再提交给jstorm集群运...

  • JStorm实时计算框架学习

    这是一个JStorm使用教程,不包含环境搭建教程,直接在公司现有集群上跑任务,关于JStorm集群环境搭建,后续研...

网友评论

      本文标题:jstorm api

      本文链接:https://www.haomeiwen.com/subject/qnewattx.html