什么是消息的可靠性传递?
- 保障消息的成功发出。
- 保障MQ节点的成功接收。
- 发送端收到MQ节点(Broker)的应答。
- 完善的消息补偿机制。
生产端—可靠性投递解决方案
方案一:消息落库,対消息状态进行打标

方案二:消息的延迟投递,做二次确认,回调补偿

场景:下单 ---- 支付 ---- 活动奖励
- 完成订单的业务数据入库、这部分为了提高性能,也可以用MQ,结合其Confirm机制即可,成就是成失败就是失败
- 当用户支付完成,平台各种活动的业务,就要考虑保障消息100%的投递成功,因为支付都完成了,不可能把钱又退回给用户
- step1 支付成功后,生产端发送业务的消息到Broker
- step2 同时生产端发送一条相同业务的延迟消息(Second Send Delay Check)到Broker,需要设置延迟时间如1~2分钟
- step3 消费端对收到的业务消息进行逻辑处理
- step4 无论逻辑的处理是成功/失败,消费端处理完毕之后,再发送一条Confirm消息(不是ACK)到Broker
- step5 Callback service是一个单独的服务,它通过Broker去监听消费端发送的Confirm消息
这时有两种可能:- 收到消息,那么将消息持久化到DB当中(无论Confirm消息状态是成功的/失败的)
- 没收到消息,那么 Callback service 的 Listener Confirm 就不工作,即Msg DB 不会产生数据、如果Confirm消息是状态是成功的,那么Callback service就不能当失败来处理,因为未发出消息可能是网络、磁盘已满等情况导致的。因此Confirm消息状态是成功时,应记录到Redis中。
- step6 这时延迟消息已投递给Broker, Callback Service去监听延迟消息所对应的队列.收到之后去检查MSG DB中是否有这条消息
- 如果存在, 业务状态是处理成功的——>通过.
- 如果存在, 业务状态是处理失败的——>那么Callback Service就需要主动发起RPC通信给上游服务,告诉它延迟投递的这条消息没有找到,需要重新发送。生产端收到信息后就会重新查询业务消息然后将消息发送出去,循环第一步。
- 如果不存在,从延迟消息中取出关键信息如唯一标识,去Redis查,看有没数据,如果有则是成功的——>通过。如果Redis查不到,则说明消费端没有处理成功,也没成功发送消息,那么Callback Service就需要主动发起RPC通信给上游服务
- step7 如果Callback service对 Msg DB更新失败的 或者 期间有什么非业务的异常,而是错误。则可发送通知给到管理员。手动处理该次业务并修复错误!
生产端&消费端
https://github.com/Liwh-yami/RabbitMQ/tree/master/src/main/java/com/finlay/scaffold/reliable
Callback 服务
package com.quanwugou.mall.mq;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quanwugou.mall.dao.model.BrokerMsgLog;
import com.quanwugou.mall.mq.model.ActivityResult;
import com.quanwugou.mall.service.BrokerMsgLogService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Component
public class ConfirmReceiver {
private static final String CONFIRM_QUEUE_NAME = "pay.confirm.queue";
@Autowired
private BrokerMsgLogService brokerMsgLogService;
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
@RabbitHandler
public void rec(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.err.println("Callback service Listener Confirm -----------begin--------");
/*step5 Callback service 消费 Confirm消息*/
ObjectMapper mapper = new ObjectMapper();
ActivityResult activityResult = mapper.readValue(msg, ActivityResult.class);
String activityId = activityResult.getActivityId();
BrokerMsgLog msgLog = brokerMsgLogService.getOne(Wrappers.<BrokerMsgLog>query().eq("activity_id", activityId));
Integer resultStatus = activityResult.getStatus();
if (msgLog == null) {
//首次
BrokerMsgLog brokerMsgLog = new BrokerMsgLog();
Date date = new Date();
brokerMsgLog.setMessage(msg)
.setStatus(resultStatus)
.setActivityId(activityId)
.setUpdateTime(date)
.setCreateTime(date);
brokerMsgLogService.save(brokerMsgLog);//不关心这个存储结果、延迟消息消费时,会判断Redis
} else {
//更新日志:最大重试次数
int i = msgLog.getTryCount();
System.err.println("Confirm check 最大重试次数:" + i);
msgLog.setTryCount(i + 1);
msgLog.setUpdateTime(new Date());
brokerMsgLogService.updateById(msgLog);
}
channel.basicAck(tag, false);
System.err.println("Callback service Listener Confirm -----------end--------");
}
}
package com.quanwugou.mall.mq;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.quanwugou.mall.dao.model.BrokerMsgLog;
import com.quanwugou.mall.mq.model.Activity;
import com.quanwugou.mall.mq.model.ActivityResult;
import com.quanwugou.mall.service.BrokerMsgLogService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author: Finlay
* @description:
* @date: 2020-07-17 5:46 下午
*/
@Component
public class DelayedReceiver {
private static final String DELAYED_QUEUE_NAME = "pay.delayed.queue";
@Autowired
private BrokerMsgLogService brokerMsgLogService;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = DELAYED_QUEUE_NAME)
@RabbitHandler
public void rec(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.err.println("接收到延迟消费时间:" + sf.format(new Date()));
ObjectMapper mapper = new ObjectMapper();
Activity activity = mapper.readValue(msg.getBody(), Activity.class);
String activityId = activity.getId();
BrokerMsgLog msgLog = brokerMsgLogService.getOne(Wrappers.<BrokerMsgLog>query().eq("activity_id", activityId));
if (msgLog != null) {
Integer status = msgLog.getStatus();
if (status == 1) {
channel.basicAck(tag, false);
} else if (status == 0) {
Integer tryCount = msgLog.getTryCount();
if (tryCount >= 3) {
System.err.println("业务处理失败。。。。。。。请手动补偿奖励业务!");
channel.basicAck(tag, false);
} else {
System.err.println("Delayed check 最大重试次数:" + tryCount);
//主动发起RPC通信给上游服务
System.err.println("主动发起RPC通信给上游服务---------Feign");
channel.basicAck(tag, false);
}
}
} else {
Object pop = redisTemplate.opsForSet().pop(activityId);
if (pop == null) {
//主动发起RPC通信给上游服务
System.err.println("主动发起RPC通信给上游服务---------Feign");
channel.basicAck(tag, false);
} else {
System.err.println("从redis中获取到业务成功通知---------更新日志---------业务结束!");
//说明成功:ack权重更高,写前面
channel.basicAck(tag, false);
//日志处理
ActivityResult result = new ActivityResult();
result.setActivityId(activityId);
result.setStatus(1);
String resultStr = mapper.writeValueAsString(result);
BrokerMsgLog brokerMsgLog = new BrokerMsgLog();
Date date = new Date();
brokerMsgLog.setMessage(resultStr)
.setStatus(1)
.setActivityId(activityId)
.setUpdateTime(date)
.setCreateTime(date);
boolean save = brokerMsgLogService.save(brokerMsgLog);
if (!save) {
System.err.println("业务处理成功,msgLog存储失败。。。。请手动补偿日志!");
}
}
}
}
}
测试结果
step6如果存在:失败
image.png
image.png
step6如果不存在:成功
image.png
网友评论