为了在原有的同步系统中新增异步消息,但是我们的目的是:第一:对于原有系统服务做到最少的代码侵入性,第二:本地DB操作+消息,要做到一致性,也就是需要在更新数据库中发布消息。以下是解决方案:
一. 使用数据库表作为消息队列(应用事务性发件箱)
- 使用关系数据库的服务将消息/事件作为本地事务的一部分插入到发件箱表
OUTBOX
; - 使用单独的
Message Relay
进程将插入数据库的事件
发布到消息中间件
应用事务性发件箱 - 然后使用一种轮询机制,轮询查询outBox消息表,将查询到的消息发送到真正的
消息队列
中。最后删除outBox对应的消息; - 轮询数据库的方式,是不建议使用的,原因:第一:长时间轮式导致数据库性能下降,第二:如果是Nosql数据库,消息是存在一个实体的属性中的,我们需要通过查询实体的方式获取消息属性,所以建议使用事务日志模式;
使用事务日志拖尾模式发布消息事件
-
每次程序提交数据库事务更新都会对应着数据库事务日志,通过日志我们可以把与
image.png消息相关
的记录发送给消息代理
-
该方案又一些实际的应用案例和实现可共参考:
(1)Debezium:(http://debezium.io) : 监控多数据库的日志更改,并向Kafka发送数据库更改,(缺点:没有用于消息发送和接收的API),要求binlog格式=row
,没有消息发送和接收的接口
(2)LinkedIn Databus:(https://github.com/linkedin/databus)一个开源项目,用于读取oracle/mysql事务日志并将更改发布为事件;(LinkedIn在2013年将databus开源)
(3)DynamoDB streams:(http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Stremas.html), 包括过去24小时内的DynamoDb表更改的序列,且这个序列是按时间排序的,应用程序可以从流读取这些更改,例如:将他们作为事件发布;(DynamoDB是是/一款Nosql产品)
(4)Eventuate Tram :(https://github.com/eventuate-tram/eventuate-tame-core): 一个开源事务消息库,它使用Mysql binlog协议,Postgres WAL或轮询来读取对OUTBOX
表所做的更改并将它们发布到Kafka; (有事务性消息和重复消息检测机制),国内资料和教程较少;(https://eventuate.io/docs/manual/eventuate-tram/latest/getting-started-eventuate-tram.html)(5)阿里巴巴的开源的canal:通过读取mysql binlog 的方式实现(https://github.com/alibaba/canal)
(6)maxwell :(https://blog.csdn.net/growing1224/article/details/103904896)
) maxwell可以对mysql数据库的二进制日志bin-log进行提取,并生成 JSON 格式的消息,作为生产者将它发送到kafka,要求binlog格式=row
(7) 美团开源的puma:(没有查到相关资料)
(8) MySQL数据实时增量同步到Kafka - Flume
网友评论