美文网首页方案Java 程序员面试精选
如何保证消息队列里的数据顺序执行?

如何保证消息队列里的数据顺序执行?

作者: 马小莫QAQ | 来源:发表于2021-10-09 13:27 被阅读0次

    原文链接:https://mp.weixin.qq.com/s/b_2OZkTIViA4IbfsfwwbxA

    使用MQ的时候,经常会有按顺序消费的需求,比如大数据团队为了做数据分析,会把数据库里的数据同步到其他系统做一些数据统计分析。同步MySQL的时候,为了保证数据同步的实时性,会在中间加一个MQ,多个线程来消费MQ里的数据。

    这种同步一般是读取binlog数据,你在MySQL里增改删了数据,对应出来就是3条增改删binlog日志发送到MQ里面,消费的时候肯定必须要按照增改删的顺序执行。如果你换成删除、修改、增加,就导致数据乱套了。

    图1 binlog同步

    我们以kafka举例,看下哪些环节会出现数据顺序不一致情况,又怎么解决。

    假设kafka分配了3个partition,kafka的一个特性就是,能保证写入一个partition中的数据一定是有顺序的。

    生产者写的时候,可以指定一个key,比如是订单id作为key,这个id对应的数据一定会写到同一个partition中去,而且这个partition中的数据都是有顺序的。

    图2 kafka partition

    kafka的消费者开始消费partition中的数据,一个消费消费一个partition,一个partition只能被一个消费者消费,不会出现一个消费者同时消费多个partition的情况。假如现在有3个partition,你启动4个消费者,那么就会有一个消费者消费不到数据。

    图3 一个消费者消费一个partition

    到目前为止,每个消费者消费到的数据都是有顺序性的。但消费者内部如果是单线程的,效率就会比较低,如果生产者写入kafka的数据量比较大,消费不及时,就会出现消息堆积的情况,所以消费者需要多线程的方式运行。

    假如消费者里启动了3个线程,并发的来消费数据,线程之间如果不做同步控制,还是会导致数据乱掉。

    图4 消费者多线程消费MQ

    那如何保证kafka消费者多线程按顺序消费数据呢?

    多个线程不能直接拿数据去处理,此时我们可以在同步系统中搞多个内存队列,消费者拿到数据之后,根据每条数据的key做hash取模,把相同id的数据分配到同一个内存队列中去。

    每个内存队列里的数据都是有顺序性的,给每个内存队列都对应一个线程,去消费内存队列中的数据。

    假如有3条增改删的数据,都是对同一个id的处理,那么hash取模后就会写入到同一个内存队列里去,由同一个线程去消费,然后按顺序写入数据库中。

    如果消费者按照单线程消费处理,一条数据耗费几十毫秒,1秒钟只能处理十几条数据,吞吐量就会非常低。如果开启多线程的方式处理,就会几倍地提高吞吐量,同时也保证了数据的顺序性。

    整个流程按这样的设计方案来处理,就可以保证数据的顺序性。

    相关文章

      网友评论

        本文标题:如何保证消息队列里的数据顺序执行?

        本文链接:https://www.haomeiwen.com/subject/bajqoltx.html