目前我采用的是RoketMq , 因为是分布式的。
功能强大,很多功能都有。
自带后台管理。
Java开发。
还支持事务消息(用于分布式事务的可靠消息最终一致性方案)。
和Kafuka的对比,不是Java开发。不支持事务。
重复消费怎么办?
保证消费的幂等性, 主要是根据业来处理。
往redis里面写一条这个消息的记录,每次消费之前,判断一下,是否之前已经消费过了。 或者根据业务实际查库。
消息丢了怎么办,怎么不丢消息?
生产者丢消息:
发送消息后,会得到一个发送消息的结果, 如果整个发送消息没有报异常,则说明发送成功。
可以设置发送失败重试的参数:
// 同步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 异步发送消息重试次数,默认为 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);
MQ丢消息:
调整broker接受消息,写入内存后,一定要写入硬盘才返回写入成功,即配置消息为同步操作:
## 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER
部署主从集群,默认是异步的主从复制,调整参数为同步复制,即从broker写入成功后,才返回写入成功:
## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH
消费者丢消息:
处理完,发送一条确认消息给MQ。 是这个思想。
消费者消费完成以后, 需要返回一个消息给broker 。 就是监听消费者的方法里面的return。
必须是执行成功消费的逻辑后才返回 成功的消息: ConsumeConcurrentlyStatus.CONSUME_SUCCESS。
否则,则返回 稍后再次消费的信息。 ConsumeConcurrentlyStatus.RECONSUME_LATER。
如果由于网络原因,没有返回消息给broker的话, 那么这个消息超时后,将会被重新消费。
怎么保证信息的顺序性?
将需要保持顺序的消息,发送到1个队列里面去。 这样消费者就可以按照顺序消费。
根据业务可以使用Java的内存队列。 甚至可以用Redis的队列。
消息积压怎么处理?
紧急扩容。 增加消费者, 进行队列积压监控。
如果之前的消费者出BUG ,或者实在处理太慢; 可以另外写一些消费者,再大量部署。 原先的消费者不只是入库,而且是将消息消费完成后,将结果发送给新的消费者集群, 让新的消费者集群去大量快速消费。
核心思想是,快速将MQ里面积压的消息,尽快处理掉。
网友评论