消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。
取款的基本流程消费失败有很多种可能性,除开业务本身存在的问题来讲,消费失败有可能是因为网络延迟,消息队列还没收到消费者返回的ack,也有可能是消费者处理消息的时间比较久,来不及返回消费结果给到消息队列,也可能是消费者挂掉了。
与消息队列相关的协议和规范有JMS、AMQP、MQTT以及OpenMessaging,而MQTT中规定了三种传递标准:
At most once:至多一次。消息传递时,最多会被送达一次,没有什么可靠性,允许消息丢失。
At least once:至少一次。消息传递时,至少会被送打一次,保证消息可靠性,但存在多次消费的可能。
Exactly once:恰好一次。消息传递时,只会被送达一次,不允许丢失也不允许重复。
依照以上标准,似乎我们只要保证消息队列符合Exactly once的标准,就可以在保证消息可靠性的前提下解决重复消费的问题。但按我的理解,阴阳调和才是平衡的,采用的无非就是时间和空间的切换,采用Exactly once标准固然符合要求,但也势必会带来一定的性能损耗,就跟分布式锁类似,而对于At least once,我们则可在业务层面保证数据不会重复消费。
用幂等性解决消息重复
所谓幂等性,就是数据无论操作多少次,所产生的影响跟执行一次是一样的,比如对于读操作来说,无论读取多少次数据,都跟读取一次的数据是一样的,所以读操作是一个幂等性操作,而添加操作,添加多次会有多条记录,因而写操作则是非幂等性操作。因而对于以上场景,只要保证消息消费的幂等性,就能解决重复消费的问题。
常见的几种设计幂等的方法:
1. 利用数据库唯一约束实现幂等
可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据。其实,只要类似“insert if not exist”的操作都可能,但需要保证查询跟添加的操作必须是原子性操作。例如:上面取款发短信的场景则可以借助redis的setnx实现。
public class SendServiceImpl implements SendService {
@Autowired
private JedisClient jedisClient;
@Value("channel")
private String channel;
@Override
public boolean sendMessage(Message message) {
String uuid = message.getUuid();
// 判断是否已经发送了
boolean send = jedisClient.setnx(channel, uuid) == 1;
if(send){
// TODO 开始发送短信
}
return true;
}
}
2. 设置前置条件
在更新的时候,可以通过设置一定的前置条件来保证数据幂等,比如给用户发送短信是非幂等操作,但可以添加前置条件,变成如果改用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,没修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。
UPDATE message SET m_status = #{status} WHERE uuid = #{uuid} AND version = #{version}
3. 通过全局ID实现
最后的方式就比较暴力也比较通用,通过设置全局Id去实现。实现的思路是,在发送消息时,给每条消息指定一个全局唯一的 ID(可以通过雪花算法去实现),消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据。
虽然看起来好像不复杂,单机环境实现也比较简单,就是查询更新的思路,但在分布式环境上一点也不简单,因为必须保证查询跟更新是原子性的操作,不能查询完又有另外一个事务去更新了数据。当然,对于这种问题也可以通过分布式事务和分布式锁去实现,但与之的也降低了系统的性能。
小结
以上便是”如何避免消息队列的重复消费“的所有内容,整理了关于该方法解决的几种思路。如果您有什么疑问或者文章有什么问题,欢迎私信留言交流~
网友评论