storm

作者: 一拳超疼 | 来源:发表于2020-03-15 15:38 被阅读0次

Storm 是什么

Apache Storm is a free and open source distributed realtime computation system.

Apache Storm makes it easy to reliably process unbounded streams of data

Apache Storm is simple, can be used with any programming language

  1. 免费和开源的实时计算系统
  2. 易无边界实时处理数据
  3. 支持多种语言

Storm 能做什么

online machine learning, continuous computation, distributed RPC, ETL, and more

  1. Storm能实现高频数据和大规模数据的实时处理

Storm的发展历史

Storm诞生于BackType(被Twitter收购)公司。

Storm技术网站

storm.apache.org

github.com/apache/storm

en.wikipedia.org/wiki/Storm_(event_processor)

Storm vs Hadoop

  • 数据源/处理领域
    • storm处理实时数据,延迟性低
    • Hadoop处理离线数据,时间长
  • 处理过程
    • storm: "topologies"
    • Hadoop: "MapReduce jobs"
  • 进程是否结束
    • Storm运行不能停止,除非kill进程
    • Hadoop完成一个一个的job,会结束
  • 处理速度

Storm vs Spark Stream

  • Spark Stream
    • 不是真正的实时处理,其实是将输入的数据分
      为多个小批次进行批处理。
    • Spark提供一站式处理,离线,实时,机器学习
  • Storm
    • 进行的真正的实时处理。
    • 单纯为实时计算而生中

Storm 核心概念

  • Stream

    • 流,由数据组成的数据流
    • 消息流,抽象概念,无边界的tuple序列构成。
    • Streams 可以用分布式可靠的方式将现有的stream转换为新的steam。
    • 为stream提供转换的基本单元是Spouts,Bolts
  • Spout

    • 产生数据的源头,从其他数据源读取数据,负责输入新的数据到Topolgies
    • 消息流的源头,Topology的消息生产者
  • Bolt

    • 处理数据,可能会产生新的流
    • 消息处理单元,可以做数据的过滤、聚合、查询/写数据库的操作
  • Tuple

    • Storm的数据模型,是一个已命名的值序列,其中的字段可以是任何类型的对象。
    • Storm支持所有基本类型、字符串和字节数组作为Tuple的字段值,还包括是实现了序列化的对象类型。
    • 消息/数据 传递的基本单位
    • Topolgies中的每个节点都必须声明发出的Tuple的输出字段类型。
  • Topology

    • 拓扑,由spout和bolt组成,是提交给Strom的顶级抽象体。
    • Topolgies中的每个节点的执行都是并行的。且Topolgies的执行是永不停止的,除非被kill。
    • Storm会自动重新分配任何失败的任务,而且,Storm不会有任何数据的丢失,即使机器宕机,信息被丢弃。主要是其所有状态都保存在Zookeeper或本地磁盘上。
    • 拓扑结构的每个节点都包含计算逻辑,两个节点之间的连接指示数据应该怎样被传递。
    • storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2 all-my-code.jar 是项目打的jar包,其中得有全部的依赖,
      org.apache.storm.MyTopology 是要提交的Topolgies。

Storm 集群的组成

  1. Storm 集群中有两种节点,分别为master node(主节点)
    worker node(工作节点)。
  2. master node会运行一个后台线程,叫做Nimbus,这和Hadoop的JobTracker很相似。Nimbus 负责在集群中分发代码、分配任务和监视故障。
  3. 每个worker node都会运行后台线程 Supervisor , 意味着该后台线程是一个管理者的角色,但是他管理的是Nimbus分配在它所在的机器上的worker processes
  4. worker processes 执行的是topology的一个子集,一个正在运行的Topology包含着多个分布在多个机器的worker processes
  5. NimbusSupervisor之间的所有协调都是交给一个 Zookeeper集群来做的。
  6. NimbusSupervisor后台线程都是实现了fail-faststateless。所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以杀死NimbusSupervisor,他们会像什么都没发生一样重新开始。这种设计使得Storm cluster非常稳定。
    1. fail-fast是指在可能发生故障的系统中,先报道表明可能发生的错误,并且停止运行,而不会让系统冒着风险去运行。
    2. stateless是指无状态,不能保存数据,例如没有实例对象的对象,不能保存对象,是线程安全的。

Storm的Java API

一个简单的Topology实例

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

new TopologyBuilder() 得到build实例对象,通过该对象来定义新节点。
setSpout, setBolt 两个方法分别是声明Spout和Bolt, 第一个参数是自定义的id,相当于节点的名字,应该是唯一的。
第二个参数是实现了固定接口的具体实体。第三个参数是指定了节点执行任务的并行度,默认为单线程。
shuffleGrouping意味着,输入的Tuple随机的输入到任务中,类似混淆。

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

一个Bolt节点也可以从多个节点中获取数据,可以链接声明。

IComponent接口

Spout和Bolt都要实现的接口。
declareOutputFields 方法声明输出Stream的规则。

ISpout

  • ISpoutSpout的核心接口,当strom发现每个在DAG(有向无环图,即Topolgies)的Tuple都被成功处理后,就会向Spout发送ack信息。
    如果有一个TupleTopolgies规定的时间内未被完全处理,Storm将会发送fail信息。
  • Spout发送一个tuple,它会给tuple标记一个message id,当storm发送ack或者fail信息时,spout会通过这个id来标识是哪个tuple。如果是没有标识的id的tuple,Storm将不会对它进行跟踪。
  • Storm执行ackfailnextTuple都在同一个线程中,这说明其保证了线程安全。因此必须保证nextTuple是非阻塞的,否则会阻塞ackfail

open(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector)

当集群内的worker初始化Spout时调用该方法,它为该Spout提供了执行需要的环境。

close

停止Spout时将会被调用,但是通常情况下都不会调用该方法,因为集群通常使用kill -9停止任务,本地模式下通常才调用该方法。

activate

Spout要进行工作时,即要执行nextTuple前,执行该方法将Spout激活。

deactivate

Spout在接下来时间里不会有工作执行的情况下,执行该方法休眠。

nextTuple

Storm请求Spout向输出收集器发送Tuple时调用该方法,该方法必须是非阻塞的,即便没有数据发出,这个方法也要返回。

ack(Object msgId)

Tuple发送成功,该方法接收Tuple的message id

fail(Object msgId)

ack,接收发送失败的Tuple的message id

IBolt

  • IBolt消费Tuple后生产新的Tuple发送出去。但是它并不会在接收到一个Tuple后立刻就处理它。
  • IBolt被客户端机器创建,然后通过Java序列化后放入Topology里,再被提交到Nimbus。随后Nimbus发起worker后,会将该IBolt反序列化(调用其中的prepare),然后它才会开始处理Tuple

prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector)

它为该Bolt提供了执行需要的环境。

execute(Tuple input)

对Tuple进行处理。Tuple中包含着它来自哪个组件,可以通过Tuple#getValue方法来得到其中的value。而且,如上所说,IBolt并不会立刻处理数据。

cleanup

和ISpout的close类似,不一定被执行,该方法的功能是在IBolt被停止时清理数据。

相关文章

网友评论

      本文标题:storm

      本文链接:https://www.haomeiwen.com/subject/sodjehtx.html