背景
- 本章主要记录PMQ中关于消息从生产者端提交到消费者消费消息的过程
- 从中我们可以看到实现一个消息系统需要的发送、存储、消费和重试环节
- 一些有用的编码风格和技巧也值得学习
消息产生和消费架构总览
PMQ架构图PMQ消息端到端流程
Client端上报消息的过程
简单流程
客户端消息发送流程图client代码流程
- (1)从client中上报消息代码
//topicName=消息topic的名字,ProducerDataDto=消息体中的内容
MqClient.publish(topicName, "",new ProducerDataDto("kantlin"+String.valueOf(i)));
- (2)client中publish方法的实现(MqResource.class)
public boolean publish(PublishMessageRequest request, int retryTimes) {
......
//这里集成了cat监控,对每个发送的消息进行链路追踪【排查问题的关键】
Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
Timer.Context timer1 = MetricSingleton.getMetricRegistry()
.timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
try {
//上报到PMD broke的接口地址
String url = MqConstanst.CONSUMERPRE + "/publish";
//调用post请求将消息发送到broker中【转(3)】
PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
......
transaction.setStatus(Transaction.SUCCESS);
//如果可处理的发送未成功,则上报到cat中
if (!response.isSuc()) {
......
addCat(request2);
}
//返回给客户端本地上报结果
return response.isSuc();
} catch (Exception e) {
//如果是未处理异常的话,除了会上报到cat还会发送对应的邮件告警
......
addCat(request2);
......
sendMail(mailRequest);
return false;
} finally {
transaction.complete();
timer1.stop();
}
}
- (3)client中post方法的实现(HttpClient.class)
public String post(String url, Object reqObj) throws IOException, BrokerException {
......
Response response = null;
//基于cat进行链路追踪
Transaction transaction = Tracer.newTransaction("mq-http", url);
try {
......
//获取http客户端并发送,有异常的话记录到cat中并向上抛,不做处理
Request request=requestbuilder.build();
response = client.newCall(request).execute();
transaction.setStatus(Transaction.SUCCESS);
......
}
......
finally {
transaction.complete();
......
}
}
Rest端接受消费者消息
简单流程
- rest端收到client publish上来的消息,先对消息进行验证
- 将消息转换为适合存储的bean对象
- 对消息进行保存并通知消费者消费
- 通过消息查询出全部订阅者,并向符合条件的订阅者起可pull数据的消息
代码流程
- (1)client上报到rest端的接口(ConsumerController.java)
@PostMapping("/publish")
public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
setSubEnv(request);
//转【2】
PublishMessageResponse response = consumerService.publish(request);
return response;
}
- (2)rest端的push方法(ConsumerServiceImpl.java),这里面比较关键的方法是getAllLocatedTopicWriteQueue()用于获取可写的队列(一个队列可以理解为一张表),getAllLocatedTopicQueue()则是获取全部队列。上述两个方法是通过QueueServiceImpl.start()中起来常驻线程来更新的。
public PublishMessageResponse publish(PublishMessageRequest request) {
......
try {
Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
if (queueEntities == null || queueEntities.size() == 0) {
......
//如果可写队列中没有包含的话,但是总的topic中有,则可能表示有偏差,因此需要重新更新缓存数据
if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
queueEntities = topicQueueMap.get(request.getTopicName());
updateQueueCache(request.getTopicName());
} else {
updateQueueCache(request.getTopicName());
return response;
}
}
//如果可写队列包含对应的topic name 那么才对消息进行保存
if (queueEntities.size() > 0) {
//转【3】
saveMsg(request, response, queueEntities);
}
} else {
......
}
} catch (Exception e) {
......
} finally {
......
}
return response;
}
- (3)将请求转换成对应的消息对象进行保存(ConsumerServiceImpl.java)
protected void saveSynMsg1(PublishMessageRequest request, PublishMessageResponse response,
List<QueueEntity> queueEntities) {
......
Map<String, PartitionInfo> partitionMap = new HashMap<>();
Map<Long, List<Message01Entity>> msgQueueMap = new HashMap<>();
//引用传递用于构造消息实体,如果request没有指定partitionInfo的话,那么partitionMap的key为默认值Long.MAX_VALUE
createMsg(request, msgQueueMap, partitionMap);
for (Map.Entry<Long, List<Message01Entity>> entry : msgQueueMap.entrySet()) {
//分三种情况进行消息进行保存
if (queueMap.containsKey(entry.getKey())) {
doSaveMsg(request, response, Arrays.asList(queueMap.get(entry.getKey())), entry.getValue());
} else if (entry.getKey() == Long.MAX_VALUE) {
//转【4】
doSaveMsg(request, response, queueEntities, entry.getValue());
} else {
entry.getValue().forEach(t1 -> {
if (partitionMap.containsKey(t1.getTraceId())) {
if (partitionMap.get(t1.getTraceId()).getStrictMode() == 0) {
doSaveMsg(request, response, queueEntities, Arrays.asList(t1));
}
}
});
}
}
}
- (4)每个可写的队列都保存一份消息(ConsumerServiceImpl.java)
private void doSaveMsg(PublishMessageRequest request, PublishMessageResponse response,
List<QueueEntity> queueEntities, List<Message01Entity> message01Entities) {
int tryCount = 0;
int queueSize = queueEntities.size();
......
int count = counterTemp.get(key).incrementAndGet();
while (tryCount <= queueSize) {
try {
QueueEntity temp = queueEntities.get(count % queueEntities.size());
count++;
......
//关键逻辑,上一步可支持写的队列都要保存消息
//转【5】
doSaveMsg(message01Entities, request, response, temp);
......
} catch (Exception e) {
......
}
}
if (last != null) {
......
}
......
}
- (5)对消息进程保存并通知相关的客户端来拉取消息消费(ConsumerServiceImpl.java)
protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
PublishMessageResponse response, QueueEntity temp) {
//动态的设置service连接的数据库
message01Service.setDbId(temp.getDbNodeId());
......
try {
......
//将消息批量的插入数据库中
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
if (soaConfig.getMqPushFlag() == 1) {
//通知对应的client端来pull data进行消费转【6】
notifyClient(temp);
}
......
return;
} catch (Exception e) {
......
} finally {
......
}
}
- (6)对client进行消息拉取的通知(ConsumerServiceImpl.java)
public void notifyClient(QueueEntity queueEntity) {
try {
......
//获取消息主要是topic相关的全部订阅者的详情
List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
......
Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
//对订阅者进行遍历
for (QueueOffsetEntity queueOffset : queueOffsetList) {
//如果订阅者可以立刻接收且满足限速条件的话,则立刻发送
if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
&& speedLimit(queueEntity.getId())) {
//拼接订阅者的回调client的ip和地址
String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;
if (!notifyMap.containsKey(clienturl)) {
notifyMap.put(clienturl, new ArrayList<>());
}
//封装回调的消息体,这里只有消费者名和消息ID
MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
msgNotifyDto.setQueueId(queueEntity.getId());
notifyMap.get(clienturl).add(msgNotifyDto);
}
}
if (notifyMap.size() == 0) {
return;
}
......
for (String url : notifyMap.keySet()) {
//构建完客户端地址后,将消息体进行封装后发送给客户端接口
try {
MsgNotifyRequest request = new MsgNotifyRequest();
request.setMsgNotifyDtos(notifyMap.get(url));
if (notifyFailTentativeLimit(url)) {
//请求的client的地址上/mq/client/notify
httpClient.postAsyn(url + "/mq/client/notify", request, new ConsumerServiceImpl.NotifyCallBack(url));
}
} catch (Exception e) {
log.error("给客户端发送拉取通知异常:", e);
}
}
......
} catch (Exception e) {
}
}
Client pull消息及消费过程
client消费消息的过程
- (1)当客户端启动完成后,重平衡监听线程会通过长链接的方式向broker查询当前实例的队列订阅情况。此过程即是重平衡的过程,重平衡过程在上一章节有介绍,不在此细述。
- (2)当重平衡完成后,当前消费实例,最终可能会被分配到一些队列(如上图的queue),客户端会根据这些队列信息进行消费。
- (3)queue的处理是PMQ消息系统客户端消费的关键。下面来详细介绍单个queue的处理过程,多个queue跟单个queue处理过程一致。
- (4)当重平衡完成后,broker会将分配的队列的元数据信息返回给客户端,比如queue的id,queue偏移量等信息。客户端会为每个queue开启一个线程,此线程会根据这些元数据信息定时向broker拉取消息。注意是每个queue都有对应的独立拉取线程。
- (5)当线程拉取到消息后,会将拉取的消息缓存到当前queue对应的缓冲队列中。如果此时缓冲队列满了,则暂停拉取直到缓冲队列不再满载为止。缓存完毕后,会开启新的一轮拉取,如果出现拉取的消息为空,则拉取线程会sleep 50ms时间,再开启新的拉取。如果再次拉取还是没有消息,则加大等待拉取时间。直到拉取等待最大值。一旦拉取到新的消息,则重新开始新的循环。
- (6)重平衡完成后,在每个队列中,也会启动一个消费调度线程,它会定时循环获取缓冲队列的消息,然后根据缓冲消息数量和批量消费的条数,计算出执行线程的个数。在PMQ中,批量消费条数和线程数可以通过后台控制页面进行动态调整,实时生效。
- (7)计算出待执行的消费条数后,会启动相对应的线程进行消费。注意此线程是一个线程池。消费线程根据批量消费条数从缓冲队列中获取待消费的消息,然后调用消费者实例的本地方法。
- (8)当消息消费完成后,会更新内存中当前队列的偏移量。偏移量提交线程,会定时提交相关的偏移量。
-
(9)如果出现消费失败,会将此消费失败的消息,发送到对应的失败topic中,然后进行重新消费。失败topic的消费逻辑与正常topic的消费逻辑一致。
客户端消费核心架构图
Client pull消息代码流程
- (1)当broker保存完消息后,如果client是可立刻通知且开启状态的话,则会回调对应请求接口(MqClientStatController.java)
@RequestMapping("/mq/client/notify")
public void notify(@RequestBody MsgNotifyRequest request) {
//如果客户端是开启标准(客户端有黑名单和不开启消费的却别)
if (isOpenFlag()) {
......
try {
//转【2】
msgNotifyService.notify(request);
......
} catch (Exception e) {
......
}
}
}
- (2)client端开始处理拉取消息消费的通知(MsgNotifyService.java)
public void notify(MsgNotifyRequest request) {
//从broke处获取消费者组的最新详情,同时更新缓存中的信息
IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
//获取当前最新消费者组及其对应的队列详情
IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
if (queues.containsKey(msgNotifyDto.getQueueId())) {
//消费者组中包含了待消费消息的队列Id的话,则要进行消费(这里可以理解为消费前进行下判断)转【3】
queues.get(msgNotifyDto.getQueueId()).notifyMsg();
}
}
});
}
}
- (3)client端请求broke拉取消息(MqQueueExcutorService.java)
protected boolean doPullingData() {
if (pullFlag.compareAndSet(false, true)) {
......
if (consumerQueueDto != null) {
......
try {
......
if (checkOffsetVersion(consumerQueueDto)) {
......
PullDataResponse response = null;
if (checkOffsetVersion(consumerQueueDto)) {
//请求broke端的pullData方法来拉取数据
response = mqResource.pullData(request);
}
......
if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
//如果请求的消息列表不为空,则缓存起来转【4】
cacheData(response, consumerQueueDto);
......
}
}
......
} catch (Exception e) {
......
} finally {
......
}
}
return false;
} else {
return true;
}
}
- (4)将response中获取到的消息放到阻塞队列blockQueue中(MqQueueExcutorService.java)
protected void cacheData(PullDataResponse response, ConsumerQueueDto pre) {
if (checkOffsetVersion(pre)) {
for (MessageDto t1 : response.getMsgs()) {
......
//将请求回来的数据放到阻塞队列messages中,并记录log
while (true && checkOffsetVersion(pre)) {
try {
messages.put(t1);
addPullLog(t1);
break;
} catch (Exception e) {
}
......
}
......
}
}
}
Client 消费消息代码流程
- 从上面流程中我们知道client端拉取到了消息并放在阻塞队列中,那么在client中也得启动一个消费线程在处理messages队列中的数据。而这个启动流程则是发生在client端进行初始化的时候。
- 在客户端启动后调用consumerPollingService的start()方法(MqClient.java)
private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {
......
//消费组service在client初始化时候执行start方法
consumerPollingService = mqFactory.createConsumerPollingService();
//转【2】
consumerPollingService.start();
......
}
- (2)获取消费者组列表并分别启动MqGroupExcutorService(ConsumerPollingService.java)
@Override
public void start() {
if (startFlag.compareAndSet(false, true)) {
......
executor.execute(new Runnable() {
@Override
public void run() {
while (!isStop) {
......
//启动常驻进程,每秒更新消费者组信息
longPolling();
......
}
}
});
}
}
protected void longPolling() {
if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
&& mqContext.getConsumerGroupVersion().size() > 0) {
......
//请求broker获取最新的消费者组的信息
GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
//处理相关的请求信息
handleGroup(response);
......
} else {
Util.sleep(1000);
}
}
protected void handleGroup(GetConsumerGroupResponse response) {
......
//遍历消费者组,创建对应的消费组实例
response.getConsumerGroups().entrySet().forEach(t1 -> {
if (!isStop) {
if (!mqExcutors.containsKey(t1.getKey())) {
mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
}
log.info("consumer_group_data_change,消费者组" + t1.getKey() + "发生重平衡或者meta更新");
// 进行重平衡操作或者更新元数据信息
mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());
mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
}
});
......
mqExcutors.values().forEach(t1 -> {
//启动消费组实例MqGroupExcutorService类中的start方法,转【3】
t1.start();
});
}
- (3)消费者组实例中启动对应queue的消费实例(MqGroupExcutorService.java)
//消费者组连续三次重平衡的版本号不变的话则开始启动组下队列的消费
public void start() {
if (!isRunning) {
versionCount++;
......
if (versionCount >= mqContext.getConfig().getRbTimes()) {
//启动队列
doStartQueue();
isRunning = true;
}
}
}
protected void doStartQueue() {
......
if (localConsumerGroup != null && localConsumerGroup.getQueues() != null
&& localConsumerGroup.getQueues().size() > 0) {
//从消费者组中获取队列表,每个都单独启动消费实例
localConsumerGroup.getQueues().values().forEach(t1 -> {
IMqQueueExcutorService mqQueueExcutorService = mqFactory
.createMqQueueExcutorService(localConsumerGroup.getMeta().getName(), t1);
mqEx.put(t1.getQueueId(), mqQueueExcutorService);
//启动每个队列的监听和消费线程,转【4】
mqQueueExcutorService.start();
});
}
......
}
- (4)监听队列中的消息,启动单独的处理线程消费(MqQueueExcutorService.java)
//此时监听到具体的待消费队列
@Override
public void start() {
if (this.iSubscriber != null || this.iAsynSubscriber != null) {
if (isStart.compareAndSet(false, true)) {
//启动时会pull一次待消费的消息,后续的pull都需要由broke进行触发【如果开启立刻消费的话】
executor.execute(new Runnable() {
@Override
public void run() {
pullingData();
}
});
executor.execute(new Runnable() {
@Override
public void run() {
while (!isStop) {
if (isRunning) {
//开启重置的消息出来线程,注意此处不能加锁,因为有些会出现延消费,然后出现阻塞
handleData();
} else {
Util.sleep(50);
}
}
}
});
}
}
}
protected void handleData() {
......
//阻塞队列messages中的消息量
int msgSize = messages.size();
//刷新订阅关系,根据消费者和topic name找到对应的消费处理类,更新在iSubscriber对象中
refreshSubscriber();
if (temp != null && msgSize > 0 && temp.getThreadSize() + 2 - executor.getActiveCount() > 0
&& (iSubscriber != null|| iAsynSubscriber != null)
&& (temp.getTimeout() == 0 || (temp.getTimeout() > 0 && timeOutCount.get() == 0))) {
......
//开启处理数据
doHandleData(temp, msgSize);
......
} else {
Util.sleep(10);
}
}
private void doHandleData(ConsumerQueueDto pre, int msgSize) {
// 线程批次概念关键代码
......
CountDownLatch countDownLatch = new CountDownLatch(startThread);
//按照批次进行消息消费
batchExcute(pre, startThread, batchRecorderId, countDownLatch);
......
}
private void batchExcute(ConsumerQueueDto pre, int startThread, long batchRecorderId,
CountDownLatch countDownLatch) {
for (int i = 0; i < startThread; i++) {
if (executor != null) {
//每个消息对应单独的处理类MsgThread,转【5】
executor.execute(new MqQueueExcutorService.MsgThread(pre, batchRecorderId, countDownLatch, timeOutCount));
}
}
}
- (5)反射到对应的处理类中进行处理(MqQueueExcutorService.java)
public class MsgThread implements Runnable {
......
@Override
public void run() {
......
if (isRunning && checkOffsetVersion(pre)) {
//启动消息消费
maxId = threadExcute(pre);
//更新offset
updateOffset(pre, maxId);
}
......
}
}
protected long threadExcute(ConsumerQueueDto pre) {
if (isRunning && (iSubscriber != null || iAsynSubscriber != null)) {
......
//从
if (messageMap.size() > 0) {
......
//对消息进行记录并提交到对应处理类进行出来
List<Long> failIds = invokeMessage(pre, messageMap);
......
}
......
}
return 0;
}
protected List<Long> invokeMessage(ConsumerQueueDto temp, Map<Long, MessageDto> messageMap) {
List<MessageDto> dtos = new ArrayList<>(messageMap.values());
......
//消费消息
failIds = doMessageReceived(dtos);
......
}
protected List<Long> doMessageReceived(List<MessageDto> dtos) throws Exception {
if (consumerQueueRef.get().getTimeout() > 0) {
return new MessageInvokeCommandForThreadIsolation(consumerGroupName, consumerQueueRef.get(), dtos,
iSubscriber, iAsynSubscriber).execute();
} else {
//走此分支的话,则根据刚刚(4)中已经确定iSubscriber处理类来进行出来,转【6】
return MessageInvokeCommandForThreadIsolation.invoke(dtos, iSubscriber, iAsynSubscriber,
consumerQueueRef.get());
}
}
- (6)反射类方法(MessageInvokeCommandForThreadIsolation.java)
public static List<Long> invoke(List<MessageDto> dtos, ISubscriber iSubscriber, IAsynSubscriber iAsynSubscriber,
ConsumerQueueDto pre) throws Exception {
......
if (iSubscriber != null) {
//回到client消费实现类中
failIds = iSubscriber.onMessageReceived(dtos);
......
}else if (iAsynSubscriber != null) {
......
}
return failIds;
}
- (7)定义一个消费者实现类
public class TestSub implements ISubscriber {
@Override
public List<Long> onMessageReceived(List<MessageDto> messages) {
System.out.println(messages.get(0).getBody());
return new ArrayList<>();
}
}
- (8)消费组和topic的对应关系是配置在xml和web ui中
<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
<consumer groupName="test1sub">
<topics>
<topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
<topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
</topics>
</consumer>
</messageQueue>
网友评论