分布式消息系统作为实现分布式系统可扩展、可伸缩的关键组件,需要具有高吞吐量、高可用等特点。分布式消息系统需要实现的特性包括:
一、顺序消息
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。
RocketMQ是通过将“相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理“来实现顺序消息。
这样对于同一个订单的创建、付款和完成消息,他们将保持这一顺序被发送和消费。
Producer端
Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
如下实现就可以保证相同的订单的消息被路由到相同的分区:
long orderId = ((Order) object).getOrderId;return mqs.get(orderId % mqs.size());
Consumer端
RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。
对于PushConsumer,消息是单线程存入MessageQueue的,因此是顺序的。每次有消息等待消费时ConsumeMessageOrderlyService保证每个线程在消费消息时需要拿到MessageQueue的锁,只有拿到锁的线程能够进行消费。由于MessageQueue里的消息是顺序的,因此每次消费时也是顺序的。
二、消息重复
果消费端收到两条一样的消息,应该怎样处理?RocketMQ的处理方式是:
1、消费端处理消息的业务逻辑保持幂等性
2、保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。
三、事务消息
RocketMQ实现事务消息的关键是事务回查。
我们以转账为例来学习RocketMQ事务消息。
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
如果事务消息发送到MQ上后,事务消息是prepare状态,对消费者还不可见,需要本地事务执行器返回给MQ一个确认消息。事务消息是否对消费者可见完全有本地事务执行器返回的确认消息决定。如果迟迟收不到确认消息,MQ会使用事务回查机制。其实现原理是,事务消息开始是Prepare状态,RocketMQ会将其持久化到本地Mysql中,然后如果收到确认消息,就删除掉这条prepare消息,如果迟迟收不到确认消息,那么RMQ会定时的扫描prepare消息,发送给produce group进行回查确认。
四、负载均衡
Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡。随机轮询队列(Roundbin),全局维护一个自增id,发送一次消息自增一次,与queueSize取模得出发送队列。
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载:遍历Consumer下的所有topic,然后根据topic订阅所有的消息获取同一topic和Consumer Group下的所有Consumer,然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等。
在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。区别是:push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
网友评论