1.各个组件启动源码、框架结构
1.1 NameServer启动
NamesrvStartup#main
- 1)NamesrvController controller = createNamesrvController(args);创建controller
1-1)检测命令行参数
1-2)创建两个核心配置:NamesrvConfig和NettyServerConfig
1-3)解析-c 和 -p 参数,赋值到上面两个配置中
1-4)ROCKETMQ_HOME环境变量检测
1-5)controller = new NamesrvController(namesrvConfig, nettyServerConfig);创建controller
1-6) controller.getConfiguration().registerConfig(properties);注册一下所有的配置 - 2)start(controller);启动controller
2-1)controller.initialize();初始化
A)this.kvConfigManager.load();加载KV配置
B)this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);创建NettyServer网络处理对象
C) this.remotingExecutor = Executors.newFixedThreadPool()Netty服务器的工作线程池
D)this.registerProcessor();注册NameServer的Processor 注册到RemotingServer中。
E)NamesrvController.this.routeInfoManager.scanNotActiveBroker();定时任务:每间隔10S扫描一次Broker,移除不活跃的Broker
F)NamesrvController.this.kvConfigManager.printAllPeriodically();定时任务:每间隔10min打印一次KV配置
2-2)Runtime.getRuntime().addShutdownHook()服务关闭钩子,在服务正常关闭时执行。
2-3) controller.start();启动服务
A)this.remotingServer.start();启动remotingServer。
B)不为null也启动this.fileWatchService.start();
NameServer的核心作用:
- 一是维护Broker的服务地址并进行及时的更新。
- 二是给Producer和Consumer提供服务获取Broker列表。
1.2 Broker启动
BrokerStartup#main
- 1)createBrokerController(args)创建controller
1-1)检测命令行参数
1-2)创建四个核心配置:BrokerConfig、NettyServerConfig、NettyClientConfig和MessageStoreConfig
1-3)解析-c 参数,赋值到上面四个配置中
1-4)ROCKETMQ_HOME环境变量检测
1-5)处理NamesrcAddr
1-6)通过brokerId判断主从:ASYNC_MASTER、SYNC_MASTER、SLAVE;Dledger集群的所有Broker节点ID都是-1
1-7)解析-p和-m参数,赋值到上面四个配置中
1-8) controller = new BrokerController(),创建核心的BrokerController
1-9)controller.getConfiguration().registerConfig(properties);注册配置
1-10)controller.initialize();初始化BrokerController。
A)加载磁盘上的配置信息(json)。topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager
B)构建消息存储管理组件DefaultMessageStore,外层还会包装插件AbstractPluginMessageStore
C)加载磁盘文件this.messageStore.load();
D)this.remotingServer = new NettyRemotingServer()
E)this.fastRemotingServer = new NettyRemotingServer()这个fastRemotingServer与RemotingServer功能基本差不多,处理VIP端口请求
F) this.sendMessageExecutor发送消息的线程池
G) this.pullMessageExecutor处理consumer的pull请求的线程池
H) this.replyMessageExecutor回复消息的线程池
I)this.queryMessageExecutor查询消息的线程池
J)this.adminBrokerExecutor
K)this.clientManageExecutor管理客户端的线程池
L) this.heartbeatExecutor心跳请求线程池
M)this.endTransactionExecutor
N) this.consumerManageExecutor
O)this.registerProcessor();Broker注册Processor
P)BrokerController.this.getBrokerStats().record();定时进行broker统计的任务
Q)BrokerController.this.consumerOffsetManager.persist();定时进行consumer消费Offset持久化到磁盘的任务
R)BrokerController.this.consumerFilterManager.persist();对consumer的filter过滤器进行持久化的任务。这里可以看到,消费者的filter是被下推到了Broker来执行的。
S)BrokerController.this.protectBroker();定时进行broker保护
T)BrokerController.this.printWaterMark();定时打印水位线 U)BrokerController.this.getMessageStore().dispatchBehindByte
s()定时进行落后commitlog分发的任务
V)this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());或者BrokerController.this.brokerOuterAPI.fetchNameServerAddr();设置NameServer的地址列表。可以从配置加载,也可以发远程请求加载
W)initialTransaction();
X)initialAcl(); 权限控制
Y)initialRpcHooks();
1-11)Runtime.getRuntime().addShutdownHook()服务关闭钩子,在服务正常关闭时执行。 - 2)start()启动controller
A)this.messageStore.start();存储组件,这里启动服务主要是为了将CommitLog的写入事件分发给ComsumeQueue和IndexFile
B)this.remotingServer.start();以及this.fastRemotingServer.start();
C)this.fileWatchService.start();
D) this.brokerOuterAPI.start();Broker的brokerOuterAPI可以理解为一个Netty客户端,往外发请求的组件。例如发送心跳
E)this.pullRequestHoldService.start();长轮询请求暂存服务
F)this.clientHousekeepingService.start();
G)this.filterServerManager.start();
H)BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());Broker核心的心跳注册任务
I)this.brokerStatsManager.start();
J) this.brokerFastFailure.start();
1.3 Netty网络框架
Broker端的NettyRemotingServer:
NettyRemotingServer#start
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
//K1 Netty服务启动的核心流程
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//Netty的核心服务流程,encoder和decoder,二进制传输协议。
//RocketMQ中的二进制传输协议比较复杂,是否能按照JSON自定义二进制协议?
//serverHandler负责最关键的网络请求处理。
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
//开始Socket监听
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//每秒清理过期的异步请求暂存结果。
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
核心是NettyServerHandler
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
核心的业务处理是在processorTable,其初始化是在BrokerController#registerProcessor
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
所以关键的业务处理,后面只要去processorTable查找类型的对应Processor就行。
RocketMQ的同步结果推送与异步结果推送
RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer , ResponseFuture> 。
处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。
处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。
1.4 Broker心跳注册过程
BrokerController#start
//K2 Broker核心的心跳注册任务,需要深入解读下。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
...
//这里才是比较关键的地方。先判断是否需要注册,然后调用doRegisterBrokerAll方法真正去注册。
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
//K2 Broker注册最核心的部分
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
//注册完保存主从节点的地址
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
会向所有NameServer进行注册:
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
//使用CopyOnWriteArrayList提升并发安全性
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
//通过CountDownLatch,保证在所有NameServer上完成注册后再一起结束。
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
NameServer端的处理:
DefaultRequestProcessor#processRequest
//K2 NameServer处理请求的核心代码
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER: //Broker注册请求处理。版本默认是当前框架版本
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request); //当前版本
} else {
return this.registerBroker(ctx, request);
}
最终会调用RouteInfoManager对Broker信息进行注册:
//K2 NameServer 实际处理Broker注册的地方
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
//routeInfoManager就是管理路由信息的核心组件。
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
看看RouteInfoManager 管理的路由信息:
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//几个关键的Table
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
2.发送消息
2.1 普通消息发送DefaultMQProducer
DefaultMQProducer#start
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
DefaultMQProducerImpl#start()
public void start() throws MQClientException {
this.start(true);
}
//K2 消息生产者的启动方法
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
//修改当前的instanceName为当前进程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//客户端核心的MQ客户端工厂 对于事务消息发送者,在这里面会完成事务消息的发送者的服务注册
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//注册MQ客户端工厂示例
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动示例 --所有客户端组件都交由mQClientFactory启动
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.startScheduledTask();
}
总结:
- 1)RocketMQ的所有客户端实例,包括生产者和消费者,都是统一交由mQClientFactory组件来启动,也就是说,所有客户端的启动流程是固定的,不同客户端的区别只是在于他们在启动前注册的一些信息不同。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable
- 2)MQClientInstance#start启动了很多服务
this.mQClientAPIImpl.fetchNameServerAddr();
this.mQClientAPIImpl.start();
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
DefaultMQProducer#send(Message)
-> DefaultMQProducerImpl#send(Message)
-> DefaultMQProducerImpl#sendDefaultImpl
- 1)TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());获取Topic信息
- 2)MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);Producer根据发送者负载均衡策略,计算把消息发到哪个MessageQueue中(Producer在获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。.sendLatencyFaultEnable默认是关闭的,Broker故障延迟机制,表示一种发送消息失败后一定时间内不再往同一个Queue重复发送的机制)
- 3)sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);实际发送消息的方法(根据MessageQueue 获取对应的Broker地址)
Producer如何管理Borker路由信息?
Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。
//找路由表的过程都是先从本地缓存找,本地缓存没有,就去NameServer上申请
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//Producer向NameServer获取更新Topic的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
//还是从本地缓存中寻找Topic的路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
2.2 事务消息TransactionMQProducer
//事务消息的启动过程。启动过程中会完成Processor的注册
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
这里唯一区别就是多了一个线程池checkExecutor。
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
DefaultMQProducerImpl#sendMessageInTransaction
- 1)TransactionListener transactionListener = getCheckListener();获取监听器
- 2)设置消息属性
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); - 3)发送消息sendResult = this.send(msg);
- 4)发送成功,执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
- 5)发送失败(各种错误原因),设置为回滚状态
- 6)根据状态进行处理this.endTransaction(msg, sendResult, localTransactionState, localException);这里会向broker发起提交或者回滚请求
客户端处理Broker回查请求:
ClientRemotingProcessor#processRequest
-> RequestCode.CHECK_TRANSACTION_STATE
ClientRemotingProcessor#checkTransactionState
-> DefaultMQProducerImpl#checkTransactionState
-> this.checkExecutor.submit(request);
-> localTransactionState = transactionListener.checkLocalTransaction(message);
3.消费消息
DefaultMQPushConsumer#start
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
DefaultMQPushConsumerImpl#start
- 1)CLUSTERING模式,this.defaultMQPushConsumer.changeInstanceNameToPID();
- 2)this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);客户端示例工厂,生产者也是交由这个工厂启动的。
- 3)this.rebalanceImpl.setAllocateMessageQueueStrategy(
this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());负载均衡策略 - 4)this.pullAPIWrapper = new PullAPIWrapper();
- 5)广播模式与集群模式的最本质区别就是offset存储的地方不一样。
广播模式是在消费者本地存储offset:this.offsetStore = new LocalFileOffsetStore();
集群模式是在Broker远端存储offset:this.offsetStore = new RemoteBrokerOffsetStore(); - 6)消费者服务
顺序消费监听创建ConsumeMessageOrderlyService;
并发消费监听创建ConsumeMessageConcurrentlyService;
this.consumeMessageService.start(); - 7)注册消费者。与生产者类似,客户端只要按要求注册即可,后续会随mQClientFactory一起启动。
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); - 8) mQClientFactory.start();
- 9)this.updateTopicSubscribeInfoWhenSubscriptionChanged();
- 10)this.mQClientFactory.checkClientInBroker();
- 11)this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- 12)this.mQClientFactory.rebalanceImmediately();
消费端负载均衡:
- AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
- AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
- AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
- AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
- AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
- AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。
消费者服务的串联:
- DefaultMQPushConsumerImpl#start
- this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
- instance = new MQClientInstance()
- this.pullMessageService = new PullMessageService(this);
- PullMessageService#run
- PullMessageService#pullMessage
- DefaultMQPushConsumerImpl#pullMessage
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest()消费者消息服务处理消费到的消息
- ConsumeMessageConcurrentlyService#submitConsumeRequest
ConsumeMessageOrderlyService#submitConsumeRequest - this.consumeExecutor.submit(consumeRequest);并发和顺序消费的线程池线程数都为20
- ConsumeMessageConcurrentlyService.ConsumeRequest#run
ConsumeMessageOrderlyService.ConsumeRequest#run 需要加锁 - status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);这个listener就是自定义的业务逻辑。
4.Broker端
4.1 文件存储
DefaultMessageStore#putMessage
//K1 Broker典型的消息存储处理
//当前版本将默认的写入方式更改成了异步写入机制。
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
try {
return asyncPutMessage(msg).get();
} catch (InterruptedException | ExecutionException e) {
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
}
}
DefaultMessageStore#asyncPutMessage
-> CommitLog#asyncPutMessage
- 1)迟消息的实现方式,就要修改一下msg的topic和queueID,改为系统默认创建的延迟队列。topic是固定的SCHEDULE_TOPIC_XXXX,queueId是根据延迟级别选择的。
- 2)加锁putMessageLock.lock();
- 3)找到最后一个CommitLog文件。最后一个就是当前写的文件
- 4)mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); 以零拷贝的方式实现消息顺序写。 ByteBuffer.allocateDirect(fileSize)
- 5)putMessageLock.unlock();
- 6)submitFlushRequest(result, msg);提交刷盘请求
同步刷盘机制:向this.flushCommitLogService提交GroupCommitRequest请求,每10ms执行一次flush;
异步刷盘机制:flushCommitLogService.wakeup();或者commitLogService.wakeup(); - 7)submitReplicaRequest(result, msg);提交主从同步请求。
-> HAService#putRequest
-> this.groupTransferService.putRequest(request);
HAService有三个核心组件,与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。
接下来看看ConsumeQueue和IndexFile的写入:
- DefaultMessageStore#start
- this.reputMessageService.start();
- DefaultMessageStore.ReputMessageService#run,Commit日志分发服务,每隔1毫秒,会检查是否需要(就是看有没有新数据)向ConsumeQueue和IndexFile中转发一次CommitLog写入的消息。
- DefaultMessageStore.ReputMessageService#doReput
- DefaultMessageStore.this.doDispatch(dispatchRequest);
CommitLogDispatcherBuildConsumeQueue#doDispatch
CommitLogDispatcherBuildIndex#doDispatch - 长轮询: 如果有消息到了主节点,并且开启了长轮询。就要通过长轮询机制通知消费者,新消息已经到了,可以消费了。DefaultMessageStore.this.messageArrivingListener.arriving(),实例是NotifyMessageArrivingListener
定时删除过期文件:
- DefaultMessageStore#start
- DefaultMessageStore#addScheduleTask
- 定时任务:DefaultMessageStore.this.cleanFilesPeriodically();
- this.cleanCommitLogService.run();定时删除过期commitlog
this.cleanConsumeQueueService.run();定时删除过期的consumequeue
4.2 长轮询机制
RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是一种定时的Pull机制。但是这时有一个问题,当使用Push模式时,如果RocketMQ中没有对应的数据,那难道一直进行空轮询吗?如果是这样的话,那显然会极大的浪费网络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,而只能等客户端下一次来拉取消息了。针对这个问题,RocketMQ实现了一种长轮询机制 long polling。
长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。
消费者拉取消息:
BrokerController#registerProcessor
-> this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
-> PullMessageProcessor#processRequest()
-> ResponseCode.PULL_NOT_FOUND,消息长轮询1:消费者消费时,没有消息就会被缓存起来。brokerAllowSuspend 客户端初次请求消息时是指定的true。重新唤醒时指定为false,hasSuspendFlag默认都是true。
this.brokerController.getPullRequestHoldService().suspendPullRequest( topic, queueId, pullRequest);请求是PullRequest
生产者发送消息:
BrokerController#registerProcessor
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
SendMessageProcessor#processRequest
- SendMessageProcessor#asyncProcessRequest()
- SendMessageProcessor#asyncSendMessage
- DefaultMessageStore#asyncPutMessage(参见4.1)
接下来看看关联的ReputMessageService
- DefaultMessageStore#start
- this.reputMessageService.start();
- DefaultMessageStore.ReputMessageService#run,Commit日志分发服务,每隔1毫秒,会检查是否需要(就是看有没有新数据)向ConsumeQueue和IndexFile中转发一次CommitLog写入的消息。
- DefaultMessageStore.ReputMessageService#doReput
- DefaultMessageStore.this.doDispatch(dispatchRequest);
CommitLogDispatcherBuildConsumeQueue#doDispatch
CommitLogDispatcherBuildIndex#doDispatch - 长轮询: 如果有消息到了主节点,并且开启了长轮询。就要通过长轮询机制通知消费者,新消息已经到了,可以消费了。DefaultMessageStore.this.messageArrivingListener.arriving(),实例是NotifyMessageArrivingListener
- NotifyMessageArrivingListener#arriving 长轮询:生产者发送消息后的监听事件
- this.pullRequestHoldService.notifyMessageArriving()
- PullRequestHoldService#notifyMessageArriving() 会去this.pullRequestTable.get(key);查找
网友评论