美文网首页
RocketMQ源码解析-消息是如何写入Broker服务器(客户

RocketMQ源码解析-消息是如何写入Broker服务器(客户

作者: 一语长情 | 来源:发表于2022-08-10 22:36 被阅读0次

RocketMQ整体物理架构如下:

本篇文件主要是介绍一条普通的消息时如何写入到Broker服务器的。

消息的写入在Producer生产者经历了哪些步骤,在发送到Broker服务器之后,又经历了哪些步骤,才最终写入到磁盘上面的。下面将会以解析源码的方式,剖析这一流程,解开这一神秘的面纱。

源码解析对应的RockeMQ的版本为目前4.9.4,为4.x系列最新的版本,消息发送的解析默认选用了同步(sync)发送的方式。

用户在创建 DefaultMQProducer 并设置好对应的NameServer、Group、topic、messageBody的设置之后,在同步发送消息时,会调用到DefaultMQProducer的send方法进行同步发送消息。

defaultMQProducerImpl是在创建DefaultMQProducer对象时创建出来的,也就是一个DefaultMQProducer对像对应一个defaultMQProducerImpl对象,继续深挖 defaultMQProducerImpl的send方法,最终调用的是sendDefaultImpl方法,这个方法是defaultMQProducerImpl的一个内部私有方法。

sendDefaultImpl的核心功能就是根据指定的Topic来获取Topic对应的PublishInfo,也就是broker关于这个topic注册到NameServer的信息,然后根据PublishInfo来选择一个队列继续调用sendKernelImpl方法进行调用。

同步发送模式下会发送3次,在选择要发送的消息队列时,会规避上一次发送有问题的BorkerName。

selectOneMessageQueue方法是在发送消息时根据Topic选取要发生的队列的方法,我们对这个方法进行解析

sendLatencyFaultEnable表示是否开启发送延迟故障机制,这个机制默认是关闭的,这个机制意思指在向Broker发送完成一条消息时,都会统计一下发送所耗费的时,因为成功并不意味着足够健康。

这意味着如果有两个Broker,并且您向它们发送了大量消息,并且所有消息都成功发送,但是其中一个Broker处理速度很慢,因此延迟很高 对于这个高延迟的Broker,我们应该将其更新为故障项,并将其隔离一段时间,以防坏的代理变得更糟,然后客户端将享受良好的发送体验。

若没有开启的话,则默认取余算法获取一个Topic下面的队列,其中需要规避lastBrokerName。

在继续跟进sendKernelImpl方法

sendKernelImpl方法较长,总结下来主要干了这么几件事情

1.根据BrokerName获取BrokerName的IP和端口地址

2.前置属性的设置,非批量消息设置唯一id、压缩、事务标记

3.执行hook函数

4.创建SendMessageRequestHeader 设置消息请求头属性,请求体为msg的body字节数组

5.根据不同的请求方式,继续调用发送消息的方法,这一层的方法是通过 mQClientFactory 的 MQClientAPIImpl调用的,mQClientFactory表示一个JVM进程中只有一个这个对象,消费者生产着都依赖这个实例,主要处理与Borker的通信以及调度任务,MQClientAPIImpl作为mQClientFactory的一个内部实例,主要负责与外部的通信

更加详细的可以看代码的注释

MQClientAPIImpl的 sendMessage方法则继续对消息进行加工。

这段代码主要就是对于即将要发送的消息进行精简,减少发送报文的大小,加快序列化与反序列化。

最终会将SendMessageRequestHeader转换为SendMessageRequestHeaderV2,并最终创建RemotingCommand对象,这个对象代表了一次真正意义上的与外部通信的抽象,也是Netty编解码操作的对象。

继续调用sendMessageSync方法,此处remotingClient是基于Netty封装的通信客户端,这个方法的后续操作,就是基于Netty的网络通信的操作了,也可以代表进入了RocketMQ操作的网络通信层。

此代码片段的核心点就在于调用Netty Channel对象的writeAndFlush方法,将request对象发往Broker服务器。因为是同步的方法,所以需要在未超时的时间内阻塞等待Broker响应的结果

我们在来看一下RemotingCommand对象在被写入Netty的调用链Pipeline之后,需要经过的一个关键ChannelHandler-->NettyEncoder,这个ChannelHandler主要负责请求对象的编码,将对象编码为字节流之后以便在网络上面传输。

NettyEncoder具体的编码规范,默认JSON序列化。

RocketMQ的解码器采用的是Netty提供的LengthFieldBasedFrameDecoder(基于长度属性的解码器),以上的编码规则也是为了适用于这个解码器,避免了粘包拆包问题。

至此,一条普通的消息已经发往Broker服务器去了,客户端的处理流程讲解完成。后续,将继续本篇文章的下一篇,Broker服务器是如何处理一条写入的消息的。

相关文章

网友评论

      本文标题:RocketMQ源码解析-消息是如何写入Broker服务器(客户

      本文链接:https://www.haomeiwen.com/subject/mhntgrtx.html