Topic、MessageQueue以及Broker之间的关系
假设现在有一个Topic,我们为它指定创建了4个MessageQueue,那么这个Topic的数据在Broker集群中是如何分布的?其实每个Topic的数据都是分布式存储在多个Broker中的。但是Topic的哪些数据存储在哪些broker上呢?所以在这里RocketMQ引入了MessageQueue的概念,本质上就是一个数据分片的机制。在这个机制中,假设你的Topic有1万条数据 ,然后你的Topic有4个MessageQueue,那么大致可以认为会在每个MessageQueue中放入2500条数据。当然这个不是绝对的,有可能有的MessageQueue数据多,有的数据少。那么这些MessageQueue放在哪里呢?答案是放在Broker上。也就是说可能就是每个broker上放两个MessageQueue,我们看下面的示意图:
topic1.jpg
在上图中,MessageQueue将一个Topic的数据拆分为了很多个数据分片,然后在每个Broker机器上都存储一些MessageQueue,通过这个方法,就可以实现Topic数据的分布式存储。
我们都知道生产者会跟NameServer进行通信获取Topic的路由数据。所以生产者从NameServer中就会知道,一个Topic有几个MessageQueue,哪些MessageQueue在哪台Broker机器上。我们暂时认为生产者会均匀的把消息写入各个MessageQueue,这样就可以让生产者把写入请求分散给多个Broker,让每个Broker都均匀分摊到一定的写入请求压力。
如果某个Master Broker出现故障了,此时正在等待的其它Slave Broker自动热切换为Master Broker,那么在切换的过程中这一组Broker就没有Master Broker可以写入了。如果你还是按照之前的策略来均匀把数据写入各个Broker上的MessageQueue,那么会导致你在一段时间内,每次访问这个挂掉的Master Broker都会访问失败,这个似乎不是我们想要的样子,对于这个问题,通常来说建议大家在Producer中开启一个开关,就是SendLatencyFaultEnable,一旦打开这个开关,它会有一个自动容错机制,比如如果某次访问一个Broker发现网络延迟有500ms还无法访问,那么就会在一段时间内自动回避访问这个Broker,比如接下来3000ms内就不会访问这个Broker了。
这样的话就可以避免一个Master Broker故障之后,短时间内生产者频繁的发送消息到这个故障的Master Broker上去,出现较多次数的异常。过段时间再去访问它,这个时候,可能这个Master Broker就已经恢复好了,比如它的Slave Broker切换为了Master Broker就可以让别人访问了。
Broker持久化消息存储
当生产者的消息发送到一个Broker上的时候,它会把这个消息直接顺序写入磁盘上的一个日志文件,叫做commitLog,如下图:
commitLog1.jpg
这个commitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息之后就直接追加写入这个文件的末尾,如果一个文件写满了1GB,就会创建一个新的commitLog文件。
在Broker中对Topic下的每个MessageQueue都会有一系列的ConsumeQueue文件,它的格式如下:
$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
{topic}指代的就是某个Topic,{queueId}指代的就是某个MessageQueue,然后对存储在这台Broker机器上的Topic下的一个MessageQueue,它有很多的ConsumeQueue文件,这个ConsumeQueue文件里存储的是一条消息对应在CommitLog文件中的offset偏移量。也就是说,ConsumeQueue中存储的是一个消息在CommitLog文件中的物理位置,也就是offset。
基于Dledger技术的高可用架构
如果基于Dledger技术来实现Broker高可用架构,实际上就是用Dledger先替换掉原来Broker自己管理的CommitLog,由Dledger来管理CommitLog,那么这样的话,就是每个Broker上都有一个Dledger组件。现在假设我们有3台机器,Dledger是如何从3台机器里选举出来一个Leader的?实际上是基于Raft协议来进行Leader选举的,至于选举的过程这里就不再描述。
那么Leader Broker收到消息之后,是如何基于Dledger把数据同步给Slave Broker的?简单来说,数据同步会分为两个阶段,一个是uncommitted阶段,一个是committed阶段。首先Leader Broker收到一条数据之后,会标记为uncommitted状态,然后它会通过自己的DledgerServer组件把这个uncommitted数据发送给Slave Broker的DledgerServer。接着Slave Broker的DledgerServer收到uncommitted数据之后,必须返回一个ack给Leader Broker的DledgerServer,如果Leader Broker收到超过半数的Slave Broker返回ack,就会将消息标记为committed状态。然后Leader Broker上的DledgerServer就会发送committed消息给Slave Broker的DledgerServer,让他们也把消息标记为committed状态,这个就是基于Raft协议实现的两阶段完成的数据同步机制。
消费者
首先我们需要了解一个概念,就是消费者组,意思就是给一组消费者起一个名字,类似于玩游戏分组一样。当一条消息进入Broker之后,每个消费者组都会接受到这条消息,但是每个组只能有一台机器能收到这条消息。
这是什么原因呢?原因就是RocketMQ集群默认的消息消费模式就是集群模式,集群模式就是一个消费者组获取到消息之后只能由其中一台机器消费。但是我们可以通过如下设置来改变为广播模式:
consumer.setMessageModel(MessageModel.BROADCASTING);
在广播模式下,组内的每台机器就都可以消费这条消息了。
网友评论