美文网首页JavaJava实战
如何避免消息队列的重复消费

如何避免消息队列的重复消费

作者: 不孤独的字符串 | 来源:发表于2020-05-24 16:58 被阅读0次

    消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。

    取款的基本流程

    消费失败有很多种可能性,除开业务本身存在的问题来讲,消费失败有可能是因为网络延迟,消息队列还没收到消费者返回的ack,也有可能是消费者处理消息的时间比较久,来不及返回消费结果给到消息队列,也可能是消费者挂掉了。

    与消息队列相关的协议和规范有JMS、AMQP、MQTT以及OpenMessaging,而MQTT中规定了三种传递标准:

    At most once:至多一次。消息传递时,最多会被送达一次,没有什么可靠性,允许消息丢失。
    At least once:至少一次。消息传递时,至少会被送打一次,保证消息可靠性,但存在多次消费的可能。
    Exactly once:恰好一次。消息传递时,只会被送达一次,不允许丢失也不允许重复。

    依照以上标准,似乎我们只要保证消息队列符合Exactly once的标准,就可以在保证消息可靠性的前提下解决重复消费的问题。但按我的理解,阴阳调和才是平衡的,采用的无非就是时间和空间的切换,采用Exactly once标准固然符合要求,但也势必会带来一定的性能损耗,就跟分布式锁类似,而对于At least once,我们则可在业务层面保证数据不会重复消费。

    用幂等性解决消息重复

    所谓幂等性,就是数据无论操作多少次,所产生的影响跟执行一次是一样的,比如对于读操作来说,无论读取多少次数据,都跟读取一次的数据是一样的,所以读操作是一个幂等性操作,而添加操作,添加多次会有多条记录,因而写操作则是非幂等性操作。因而对于以上场景,只要保证消息消费的幂等性,就能解决重复消费的问题。

    常见的几种设计幂等的方法:

    1. 利用数据库唯一约束实现幂等

    可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据。其实,只要类似“insert if not exist”的操作都可能,但需要保证查询跟添加的操作必须是原子性操作。例如:上面取款发短信的场景则可以借助redis的setnx实现。

    public class SendServiceImpl implements SendService {
    
        @Autowired
        private JedisClient jedisClient;
        @Value("channel")
        private String channel;
    
        @Override
        public boolean sendMessage(Message message) {
            String uuid = message.getUuid();
            // 判断是否已经发送了
            boolean send = jedisClient.setnx(channel, uuid) == 1;
            if(send){
                // TODO 开始发送短信
            }
            return true;
        }
    }
    
    2. 设置前置条件

    在更新的时候,可以通过设置一定的前置条件来保证数据幂等,比如给用户发送短信是非幂等操作,但可以添加前置条件,变成如果改用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,没修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。

    UPDATE message SET m_status = #{status} WHERE uuid = #{uuid} AND version = #{version}
    
    3. 通过全局ID实现

    最后的方式就比较暴力也比较通用,通过设置全局Id去实现。实现的思路是,在发送消息时,给每条消息指定一个全局唯一的 ID(可以通过雪花算法去实现),消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据。

    虽然看起来好像不复杂,单机环境实现也比较简单,就是查询更新的思路,但在分布式环境上一点也不简单,因为必须保证查询跟更新是原子性的操作,不能查询完又有另外一个事务去更新了数据。当然,对于这种问题也可以通过分布式事务和分布式锁去实现,但与之的也降低了系统的性能。

    小结

    以上便是”如何避免消息队列的重复消费“的所有内容,整理了关于该方法解决的几种思路。如果您有什么疑问或者文章有什么问题,欢迎私信留言交流~

    相关文章

      网友评论

        本文标题:如何避免消息队列的重复消费

        本文链接:https://www.haomeiwen.com/subject/flejahtx.html