Storm框架介绍
流式处理框架
storm是个实时的,分布以及具备高容错的计算系统
- storm进程常驻内存
- storm数据不经过磁盘,在内存中处理
Twitter开源的分布式实时大数据处理框架,最早开源于github
架构
- Nimbus主节点
- Supervisor从节点
- Worker 进程
编程模型
- DAG(Topology)
- Spout
- Bolt
高可靠性
- 异常处理
- 消息可靠性保障机制(ACK)
流式处理
- 流式处理(异步与同步)
客户端提交数据进行结算,并不会等待数据计算结果 - 逐条处理
例:ETL(数据清洗)extracted transform load - 统计分析
例:计算PV,UV,访问热点以及某些数据的聚合,加和,平均等 - 客户端提交数据之后,计算完成结果存储到Redis,hbase,MySQL或者其他MQ中
客户端并不关心最终结果是多少
实时请求
- 实时请求应答服务(同步)
客户端提交数据请求之后,立刻取得计算结果并返回客户端 - Drpc
- 实时请求处理
例:图片特征提取
storm:进程,线程常驻内存运行,数据不进入磁盘,数据通过网络传递
storm:纯流式处理
- 专门为流式处理设计
- 数据传输模式更为简单,很多地方也更为高效
- 并不是不能做批处理,他也可以用来做微批处理,来提高吞吐
Spark Streaming:微批处理
- 将RDD做的很小来用小的批处理来接近流式处理
- 基于内存和DAG可以把处理任务做的很快

Topology – DAG有向无环图的实现
- 对于Storm实时计算逻辑的封装
- 即,由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
- 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止
(区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)
Tuple – 元组
Stream中最小数据组成单元
Stream – 数据流
- 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream
- Stream声明时需给其指定一个Id(默认为Default)
实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId
Spout – 数据源
- 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
- 一个Spout可以发送多个数据流(Stream)
可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过-
SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去 - Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算
Bolt – 数据流处理组件
- 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
- 一个Bolt可以发送多个数据流(Stream)
可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去 - Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑
Stream Grouping – 数据流分组(即数据分发策略)
-
Shuffle Grouping
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
轮询,平均分配 -
Fields Grouping
按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。 -
All Grouping
广播发送,对于每一个tuple,所有的bolts都会收到 -
Global Grouping
全局分组,把tuple分配给task id最低的task 。 -
None Grouping
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。 -
Direct Grouping
指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id) -
Local or shuffle grouping
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
customGrouping
自定义,相当于mapreduce那里自己去实现一个partition一样。
接下来代码讲grouping
为了方便就在代码里注释写上

track.log
public class MySpout implements IRichSpout {
private static final long serialVersionUID = 1L;
FileInputStream fis;
InputStreamReader isr;
BufferedReader br;//输入流,转换流,缓冲流
SpoutOutputCollector collector = null;
String str = null;
@Override
public void nextTuple() {
try {
while ((str = this.br.readLine()) != null) {
// 过滤动作
collector.emit(new Values(str, str.split("\t")[1]));//相当于发送了两个,一个是当前行,一个是当前行的第二个元素,即id
}
} catch (Exception e) {
}
}
@Override
public void close() {
try {
br.close();
isr.close();
fis.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.collector = collector;
this.fis = new FileInputStream("track.log");//文件放在项目下所以没盘符
this.isr = new InputStreamReader(fis, "UTF-8");
this.br = new BufferedReader(isr);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("log", "session_id"));
}//声明几个发送结果,要匹配
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void ack(Object msgId) {
System.out.println("spout ack:" + msgId.toString());
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void fail(Object msgId) {
System.out.println("spout fail:" + msgId.toString());
}
}
mybolt接收上游数据
public class MyBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
int num = 0;
String valueString = null;
@Override
public void cleanup() {
}
@Override
public void execute(Tuple input) {
try {
valueString = input.getStringByField("log");//获取值
if (valueString != null) {
num++;
System.err.println(input.getSourceStreamId() + " " + Thread.currentThread().getName() + "--id="
+ Thread.currentThread().getId() + " lines :" + num + " session_id:"
+ valueString.split("\t")[1]);//打印流的id,当前线程名称id
}
collector.ack(input);
// Thread.sleep(2000);
} catch (Exception e) {
collector.fail(input);
e.printStackTrace();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(""));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MySpout(), 1);
// shuffleGrouping其实就是随机往下游去发,不自觉的做到了负载均衡
// builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");
// fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模
// builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id"));
// 只往一个里面发,往taskId小的那个里面去发送
// builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");
// 等于shuffleGrouping
// builder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");
// 广播
builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");
// Map conf = new HashMap();
// conf.put(Config.TOPOLOGY_WORKERS, 4);
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(30);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());//不传参数就以本地方式执行
}
}
}

storm架构

storm任务提交流程

Storm 本地目录树

Storm Zookeeper目录树

网友评论