参考文档:
- storm1.0.3官方文档:Guaranteeing Message Processing
- 美团技术博客:Storm 的可靠性保证测试
Storm offers several different levels of guaranteed message processing, including best effort, at least once, and exactly once through Trident.
一、消息完全处理
定义
消息完全处理描述的是一个从Spout
发出的tuple
怎样算是处理成功。
关键字:tuple tree
只要在spout下游的bolt有一个失败(需要bolt主动通知Acker
)就算失败。
如何得知一个从spout发出的tuple是否被完全处理
storm使用一个内置的Acker
结点来跟踪一个tuple是否被完全处理成功。
成功会回调spout
的ack
方法,失败会回调fail
方法
二、如何实现At Most Once
语义
Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:
- 关闭 ACK 机制,即 Acker 数目设置为 0
- Spout 不实现可靠性传输
- Spout 发送消息是使用不带 message ID 的 API
- 不实现 fail 函数
- Bolt 不把处理成功或失败的消息发送给 Acker
三、如何实现At Least Once
语义
如果需要实现 At Least Once 语义,则需要同时保证如下几条:
- 开启 ACK 机制,即 Acker 数目大于 0
- Spout 实现可靠性传输保证
- Spout 发送消息时附带 message 的 ID
- 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
- Bolt 在处理成功或失败后需要调用相应的方法通知 Acker
如何实现可靠的 Spout
实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:
/**
* 想实现可靠的 Spout,需要实现如下两点
* 1. 在 nextTuple 函数中调用 emit 函数时需要带一个 msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
* 2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)
*/
public void nextTuple() {
//设置 msgId 和 Value 一样,方便 fail 之后重发
collector.emit(new Values(curNum + "", round + ""), curNum + ":" + round);
}
@Override
public void fail(Object msgId) {//消息发送失败时的回调函数
String tmp = (String)msgId; //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
String[] args = tmp.split(":");
//消息进行重发
collector.emit(new Values(args[0], args[1]), msgId);
}
如何实现可靠的 Bolt
Storm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示:
//BaseRichBolt 实现不可靠消息传递
public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(new Values(word)); // 不建立 anchor 树
}
_collector.ack(tuple); //手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//BaseRichBolt 实现可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); // 建立 anchor 树
}
_collector.ack(tuple); //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
下面的示例会可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
//BaseBasicBolt 是吸纳可靠的消息传递
public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
四、如何实现Exactly Once
语义
todo: Trident
网友评论