客户端管理Channel
因为客户端要主动发起请求,以consumer为例,在对topic进行消费时,不同的topic可能在不同的broker上,因此consumer端需要对连接的多个server的Channel进行管理
流程如下:consumer和nameserver通信,获取broker地址,根据地址查询是否存在Channel,若不存在就创建Channel,并在本地缓存,下次通信时从缓存中获取Channel。通过Channel和broker进行通信。
关于自动创建topic的逻辑
发送消息时,如果是新的topic,producer会先使用默认的topicKey(TBW102)去nameserver请求一个默认的TopicRoute,这个topicRoute会包含broker等信息,然后根据这个topicRoute再构建一个新的topic对应的TopicPublishInfo,将这个TopicPublishInfo放入producerImpl的缓存即topicPublishInfoTable中。这样做是为了后期发送消息的统一性(发送消息需要按照topic和broker的对应关系来做),这样在发送消息时就直接选择一个broker进行发送即可,broker端对于不存在的topic再进行创建。
nameserver返回的TopicRouteInfo中存有topic对应的所有broker的信息,以及broker中所有的master和slave,客户端会将其转化为自己使用的,例如producer会将其转换为TopicPublishInfo。
发送消息时根据brokerName找到brokerAddress
rocket集群相关
客户端做负载均衡,会在客户端选择broker进行通信,是在客户端将消息发送到broker,因此相当于在客户端将topic进行了分片处理,高可用场景下可以使用异步复制或者同步双写保证数据一致性
目前高可用模式是多master多slave,一个master对应一个slave,master负责读写,slave可以帮助读。在master挂掉后slave并不能升级成为master,但是可以继续从slave读。后期写时只会写到其他的master。
不同的master broker的brokerName不同,master和对应slave的brokerName相同,brokerId不同,brokerId为0为master,brokerId不为0的为slave。
一个master可以对应多个slave,但是只有brokerId为1的才参与读消息,别的slave还是可以帮忙存储消息的。。。
参考文档中这段话
注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
高可用实现
master和slave两种同步模式
同步双写
异步复制
如果当前broker为slave,在将自己注册到nameserver时,若nameserver的注册列表中存在对应的master,将会返回master的信息,包括master的地址和ha地址。ha地址就是master配置的brokerIP2。随后slave会启动一个线程(是在HAService中的HAClient),不停的将当前broker的消息位置的最大物理偏移量发送给master,master接收后会返回这个位置之后的消息,slave接收后写入commitlog,达到异步复制的功能。
ConsumerGroup相关逻辑
如果两个consumer的group相同,在cluster模式下,会平均的消费消息。
如果两个consumer的group不同,一条消息会在两个consumer处各消费一次。
Consumer端负载均衡的核心设计理念:一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。
consumer端负载均衡的逻辑在RebalanceService。
cluster模式下
从broker获取同一个group下的consumer列表,按照平均分配的分页算法,找到当前consumer对应的MessageQueue列表,将该MessageQueue列表和当前的processQueueTable做比对,将不在processQueueTable中的MessageQueue构造PullRequest,发送给PullService。
此处的分页算法,是先将consumer和MessqgeQueueList排序,用consumer作为页码,将MessageQueueList分为记录,根据页码划分记录,并求出每一页的size和每一页的范围range。最后根据consumerId找到对应的MessageQueue。
消息存储
几个重要的类
- CommitLog
- MappedFile
- MappedFileQueue
主要用到的jdk中的类
- RandomAccessFile
- MappedByteBuffer
- FileChannel
主要用到的技术,内存映射
消息落盘之后,ReputMessageService线程会更新ConsumeQueue和Index文件。
网友评论