一.CoordinatedBolt中的消息类型
- REGULAR: 正常的数据消息。
- ID: 从协调Spout节点收到的事务提交消息:
- COORD: 其他的CoordinatedBolt收到的协调消息
CoordinatedBolt会根据输入消息的流号来对消息的类型进行判断。 Topology构建器会将 实现了ICommitter的 Bolt中的_idStreamSpec设为协调 Spout节点的事务提交流。其他情况下,_idStreamSpec的值为空。如果消息来自协调消息流,则消息的类型为COORD,其他默认情况下为REGULAR类型。
消息类型,以及判定的代码如下
static enum TupleType {
REGULAR,
ID,
COORD
}
private TupleType getTupleType(Tuple tuple) {
if(_idStreamSpec!=null
&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
return TupleType.ID;
} else if(!_sourceArgs.isEmpty()
&& tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
return TupleType.COORD;
} else {
return TupleType.REGULAR;
}
}
二.CoordinatedBolt的成员变量的定义
///用来表示哪些节点将向该Bolt发送协调消息。
private Map<String, SourceArgs> _sourceArgs;
//目前用来表示该节点是否是事务提交节点。
private IdStreamSpec _idStreamSpec;
//内含实际的Bolt逻辑。
private IRichBolt _delegate;
//表示Bolt的上游节点的个数
private Integer _numSourceReports;
//表示将向哪些Task发送数据。
private List<Integer> _countOutTasks = new ArrayList<Integer>();
//可以统计消息发送和接收数目的输出收集器
private OutputCollector _collector;
//用来保存在此节点中正在被处理的事务尝试, 键为事务尝试消息, 值为该事务的处理情况
private TimeCacheMap<Object, TrackingInfo> _tracked;
三.分析源码
接下来分析一下主要的实现方法。 首先, 我们看看execute方法的实现
public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;
//计算获得输入消息的消息类型
TupleType type = getTupleType(tuple);
synchronized(_tracked) {
//得到消息的事务尝试消息, 然后将其作为键, 若该事务还没有被跟踪,则开始对其进行跟踪。
track = _tracked.get(id);
if(track==null) {
track = new TrackingInfo();
//如果_idStreamSpec为空, 则直接将receivedld设置为true
//即在非事务提交节点中会这样设置
if(_idStreamSpec==null) track.receivedId = true;
_tracked.put(id, track);
}
}
if(type==TupleType.ID) {
//表示收到了从协调Spout发送过来的事务提交消息
synchronized(_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if(type==TupleType.COORD) {
//表示收到了从其他节点发送过来的协调信息, 此时Storm将更新跟踪信息中的reportCount以及expectedTupleCount。
int count = (Integer) tuple.getValue(1);
synchronized(_tracked) {
track.reportCount++;
track.expectedTupleCount+=count;
}
checkFinishId(tuple, type);
} else {
//处理普通的数据消息
synchronized(_tracked) {
_delegate.execute(tuple);
}
}
}
在处理控制消息时, 会调用checkFinishld方法来检测该节点是否完成了对事务的处理, 进而决定是否可以调用finishBatch方法以及是否可以向其下游节点发送协调消息等。checkFinishld方法的分析如下, 该方法是CoordinatedBolt的核心。
private boolean checkFinishId(Tuple tup, TupleType type) {
Object id = tup.getValue(0);
boolean failed = false;
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
try {
if(track!=null) {
boolean delayed = false;
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
track.ackTuples.add(tup);
delayed = true;
}
if(track.failed) {
failed = true;
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
//判断事务是否已经处理结束并进行相应的处理。
} else if(track.receivedId
&& (_sourceArgs.isEmpty() ||
track.reportCount==_numSourceReports &&
track.expectedTupleCount == track.receivedTuples)){
if(_delegate instanceof FinishedCallback) {
((FinishedCallback)_delegate).finishedId(id);
}
if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
}
Iterator<Integer> outTasks = _countOutTasks.iterator();
while(outTasks.hasNext()) {
int task = outTasks.next();
int numTuples = get(track.taskEmittedTuples, task, 0);
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
}
for(Tuple t: track.ackTuples) {
_collector.ack(t);
}
track.finished = true;
_tracked.remove(id);
}
if(!delayed && type!=TupleType.REGULAR) {
if(track.failed) {
_collector.fail(tup);
} else {
_collector.ack(tup);
}
}
} else {
if(type!=TupleType.REGULAR) _collector.fail(tup);
}
} catch(FailedException e) {
LOG.error("Failed to finish batch", e);
for(Tuple t: track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
failed = true;
}
}
return failed;
}
首先是如果收到了控制消息, 则将其放入变量ackTuples中, 并在事务处理结束,后统一进行Ack操作, 或者在事务处理失败后统一进行Fail操作。
当认为事务已经失败时,将对收到的控制消息进行Fail操作, 并将该事务尝试从跟踪列表中去掉。
判断是否已经完成了对事务的处理,判断的条件为:
1.track.receivedId:若为事务提交Bolt节点, 并且收到了从协调Spout发送来的事务提交消息则为true。 若为非事务提交Bolt节点, 该条件默认为true。
2._sourceArgs.isEmpty()表示该节点没有协调消息输人,或者reportCount==_numSourceReports且expectedTupleCount == receivedTuples,这表示该节点的消息源均向该节点发送了协调消息,并且收全一个事务的所有消息。
上述条件表示该节点已经收全一个事务的所有消息, 可以对事务进行后处理了。 这是一个非常关键的时间点。
注意:系统中的协调Spout节点并不适合用CoordinatedBolt进行封装, 所以事务Topology中的消息发送节点为最开始的CoordinatedBolt节点, 再没有其他节点会向其发送协调消息了。 但是消息发送节点只从协调Spout接收消息, 一条消息即表示一个事务,因此其处理较为简单, 当其收到一条消息后即认为已经收到了属于该事务的全部消息,并且满足向下游节点发送协调消息的条件。
后续如果用户的Bolt实现了FinishedCallback接口, 此时将调用其finishld方法, 即用户的finishBatch方法。
此时, 保证了Storm已经接收到了属于该事务的所有消息, 于是可以调用finishBatch来对事务进行后处理。 用户实现IBatchBolt时, 需要实现finishBatch方法。 CoordinatedBolt保证一个Bolt节点收全了一个事务的所有消息后,finishBatch方法才会被调用。 实际上,CoordinatedBolt可以用在非事务的批处理环境下。
_collector.emitDirect用于向下游的节点发送协调消息, 这样下游节点便可以用来判断事务是否可以结束。 此时, 整个Topology就通过协调消息串联了起来。注意这里采用tup作为锚点,用于跟踪保证事务被处理完毕。
之后就一些ack操作等,checkFinishId方法核心的功能其实已经完毕了。
网友评论