RocketMQ是阿里巴巴捐赠给appache的MQ开源组件,从架构上我们分析一下。
高可用
![](https://img.haomeiwen.com/i6301972/cd9c2ddc562fb6b2.png)
集群选举
kafka是依靠Zookeeper进行集群选举的,在rocketMQ的同样位置上是NameServer,这个Nameserver仅仅是注册服务,没有选举能力。每个broker都和NameServer进行连接,通过心跳维持状态。
producer和consumer定时到Nameserver拉取broker信息,并且和自己所消费的broker建立连接。这就和微服务的体系一模一样了。
那么rocketMQ的集群选举怎么实现的呢,通过集成了Dledge实现,Dledge是个jar包,实现了raft算法。
![](https://img.haomeiwen.com/i6301972/dc6b77aa5da2fb31.png)
分片
![](https://img.haomeiwen.com/i6301972/9b6aafd311ca35f9.png)
如图,topic可在多个broker上形成分片,producer可写数据到不通的分片,分片信息也可以由不同的group进行消费。
主备
如下介绍存储,rocketMQ可配置主备,形成主备复制。
消息存储
![](https://img.haomeiwen.com/i6301972/b6f828eddfd81288.png)
http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/介绍了rocketMQ存储设计初衷,和kafka存储不同,kafka在每个partition中存储了数据,而RocketMQ将实际消息集中存储,在messageQueue中存储的是元数据信息,通过元数据信息可以索引到CommitLog。
对于保存的数据,每天会删除数据;如果磁盘满,超过设置阈值,则不允许写入数据。
业务相关
顺序消息
RocketMQ的设计确保了消息的并发处理能力,但是有时候,消息是有状态的,即有顺序,RocketMQ怎么实现呢?
- 生产者使用单线程发送消息,确保发送有序。
- 生产者路由选择一个相同的MessageQueue发送消息
- 消费者使用单线程收取消息。
事务消息
![](https://img.haomeiwen.com/i6301972/d90f5bedfdb05e59.png)
- 正常流程
- 发送未确认消息给broker
- broker返回收到未确认信息
- producer执行本地事务
- 执行成功则给broker发送消息确认信息
- 异常流程
- broker未收到消息确认
- 查询本地事务
- 根据本地事务发送commit or rollback信息
延迟消息
![](https://img.haomeiwen.com/i6301972/5808078d2c9473e6.png)
发送到临时缓存,到达延迟时间后由delay service路由给topic。
重试
如果消费返回了consumer_later,则如上述延迟消息一样,会延迟一段时间,进入死信队列,消费死信队列,重新处理。
MQ组件对比
对比项 | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
社区 | 活跃 | 活跃 | 活跃 |
依赖 | erlang | jdk | jdk、zookeeper |
支持协议 | AMQP、MQTT、STOMP、XMPP | JMS | 自定义 |
消费模式 | push、pull | pull | pull |
消息过滤 | topic | tag | 不支持 |
事务性消息 | 支持 | 支持 | 支持 |
顺序性消息 | 不支持 | 支持 | 不支持,消息可能有一定的丢失 |
延迟队列 | 支持 | 支持 | 不支持 |
持久化 | 内存、磁盘 | 磁盘 | 磁盘 |
高可用 | 普通集群,依赖HAProxy和KeepAlive | 队列和副本 | 分区和副本 |
如何选型
如果业务规模小,不会改源码,就选用RabbitMQ;如果业务规模大,不允许丢消息,追求效率高,用RocketMQ;如果业务规模大,运行少量丢消息,吞吐量大,用Kafka;如果用于大数据,毫无疑问选kafka。
网友评论