美文网首页程序员
分布式锁防KAFKA抖动

分布式锁防KAFKA抖动

作者: 小云海 | 来源:发表于2018-12-26 09:43 被阅读7次

    双十一期间遇到一个灾难级别的问题:一个kafka消息被不同的jboss消费了两次,导致用户产生重复数据,直接影响了正常的业务流程。

    最终我们调查发现,由于zk集群压力过大,我们的四个partition有一个直接断连了,jboss集群的消费能力直线下降,截止发现的时候,已经积压了数百个消息。我们猜测,在网络抖动的情况下,也许offset未能及时提交,导致一条kafka消息被消费了两次。

    事后我们调整了zk相关的参数,但是我们代码的隐患也需要解决。我决定通过分布式锁来限定消息的消费次数,我们的每条消息的有唯一标识taskId,那么我给taskId加上一个锁就可以了。

    分布式锁的实现方案有很多种,由于我们只需要防止kafka在抖动的那几秒,我决定用redis的单线程特性实现这个分布锁,那么我只要保证我的redis操作是原子的就可以了。

    早期的redis并没有设计实现分布锁的方案,只有setnx(如果该键不存在则创建)这样的方法,想要销毁这个锁只能通过expire()设置超时时间。但是这是两个命令操作,不能保证原子性,只能把这两个命令写在lua脚本里面才能实现这个分布锁。后来redis作者为了解决这种乱象,给set命令做了增强处理,上面的两个命令都可以通过set命令实现了。

    从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

    • EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。

    • PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。

    • NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

    • XX :只在键已经存在时,才对键进行设置操作。

    那么现在实现该分布锁的方式已经很简单了,当一个kafka消息到来的时候,我们通过jedis.set(String key, String value, String nxxx, String expx, int time)
    设置jedis.set([taskId],””,”nx”,”ex”,10)

    当该消息未被消费时,系统将会在redis中记录这个消息,并在10s后销毁它。假如这10s内kafka发生了抖动,系统会探知到该消息已经被消费过了,从而不会对该消息做任何处理。

    相关文章

      网友评论

        本文标题:分布式锁防KAFKA抖动

        本文链接:https://www.haomeiwen.com/subject/osjqlqtx.html