消息的可靠处理机制
Storm内部通过一种巧妙的异或算法判读每个tuple是否被正确完整的处理。
Spout的一个Task创建一个Tuple时,即在Spout的nextTuple()方法中实现从特定数据源读取数据的处理逻辑中,会与Acker进行通信,向Acker发送消息,Acker保存该Tuple对应信息:{:spout-task task-id :val ack-val)}。
Bolt在emit一个新的子Tuple时,会保存子Tuple与父Tuple的关系。
在Bolt中进行ack时,会计算出父Tuple与由该父Tuple新生成的所有子Tuple的一个异或值,将该值发送给Acker(计算异或值:tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 … ^ child-tuple-idN))。可见,这里Bolt并没有把所有生成的子Tuple发送给Acker,这要比发送一个异或值大得多了,只发送一个异或值大大降低了Bolt与Acker之间网络通信的开销。
Acker收到Bolt发送的异或值,与当前保存的task-id对应的初始ack-val做异或,tuple-id与ack-val相同,异或结果为0,但是子Tuple的child-tuple-id等并不互相相同,只有等所有的子Tuple的child-tuple-id都执行ack回来,最后ack-val就为0,表示整个Tuple树处理成功。无论成功与失败,最后都要从Acker维护的队列中移除。
最后,Acker会向产生该原始父Tuple的Spout对应的Task发送通知,成功或者失败,回调Spout的ack或fail方法。如果我们在实现Spout时,重写了ack和fail方法,处理回调就会执行这里的逻辑。
当然这种异或算法存在1/2^64概率的误差,可以忽略不计。
在开发中,对于那些不允许丢失的消息我们在发送消息时要对tuple指定messageID并进行锚定,告诉tuple tree这里增加了一个新的节点,保证消息的可靠性。
collector.emit(tuple,messageId)//可靠消息
collector.emit(tuple)//不可靠的消息
collector.emit(tuple, new Values(word));//锚定发送,可靠的消息
collector.emit(new Values(word)));//非锚定发送,不可靠的消息
注意:继承BaseBasicBolt实现的API本是就是可靠性的,不需要自己进行锚定发送和调用ack以及fail方法
网友评论