1、前言
本地消息表是最简单实用的分布式事务方案,基本上所知的分布式事务场景,都可以拆成本地消息表的方式执行,最后达到最终一致性。
无论是 TCC 事务还是本地消息表(TCC 我还没看过一个实际的开源中间件),都依赖本地数据库事务(至少本地消息表是这样的),它将分布式事务拆分成一个个依赖于本地的数据库事务,然后以异步的方式达到最终一致性。
2、设计
本地消息表的分布式事务如图所示:

需要注意的是:消费方注意幂等性问题。
比如我们以传统的订单下发、扣除库存为例(下面代码中为了简略使用了一个订单消息表,实际上为了通用,应该设计通用的消息表,以类型区分是什么消息):
- 1.插入订单记录时,保存一条消息到本地消息表中,状态字段变为未发送,保证原子性。然后将消发送到 MQ(提供方要确保 插入订单记录 和 插入订单本地消息表 要在同一事务下面。)
- 2.消息发送成功,执行Confirm的回调,修改消息表中的状态为已发送。
- 3.消费方采用手动ack机制+重试机制,接收到消息后先进行幂等性判断,然后在处理自己的逻辑。
消费方异常,本地事务回滚,触发重试,重试失败,ack无法签收,本地消息表状态也无法修改, 保证一致性。- 4.为了避免订单系统消息发送不成功的情况,可以提供一个补偿的定时任务,轮询扫描订单的本地消息表,找出所有未发送的记录,进行消息发送。
订单(发送方)代码:
package com.tang.order.service;
import com.tang.order.client.ClientStock;
import com.tang.order.domain.Orders;
import com.tang.order.domain.OrdersMessage;
import com.tang.order.mapper.OrdersMapper;
import com.tang.order.mapper.OrdersMessageMapper;
import com.tang.order.utils.MapperUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 创建订单、订单消息、发生消息到 MQ
*/
@Slf4j
@Service
public class OrderService {
@Resource
private ClientStock clientStock;
@Resource
private OrdersMapper ordersMapper;
@Resource
private OrdersMessageMapper ordersMessageMapper;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 插入订单记录 和 订单消息表 要保证原子性
* @param orderId
* @return
*/
@Transactional(rollbackFor = Exception.class)
public boolean createOrder(String orderId) {
Orders orders = new Orders();
orders.setName("蚂蚁课堂");
orders.setOrderCreatetime(new Date());
orders.setOrderState(0);
orders.setOrderMoney(10.0);
orders.setOrderId(orderId);
// 添加订单记录
int flage = ordersMapper.insert(orders);
OrdersMessage ordersMessage = null;
if (flage == 1) {
ordersMessage = new OrdersMessage();
ordersMessage.setId(0);
Map<String, Object> map = new HashMap<>();
map.put("orderId", orderId);
// 消息体的内容
ordersMessage.setMessageBody(MapperUtils.ObjectToJson(map));
// 状态为0表示,消息未发送
ordersMessage.setStatus(0);
ordersMessage.setCreateTime(new Date());
flage = ordersMessageMapper.insert(ordersMessage);
if (flage != 1) {
throw new RuntimeException("回滚事务!");
}
}
// 直接发送
if(flage == 1 && ordersMessage != null){
// 注册回调函数
this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
log.info("消息是否发送成功--->" + ack);
// 消息发送成功了
if (ack) {
log.info("消息发送成功,更改消息表中的状态....");
// 更改消息表的状态为已发送
String id = correlationData.getId();
OrdersMessage ordersMessage = new OrdersMessage();
ordersMessage.setId(Integer.valueOf(id));
ordersMessage.setStatus(1);
ordersMessageMapper.updateByPrimaryKeySelective(ordersMessage);
}
}
});
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.convertAndSend("stockExchange","direct.key",ordersMessage.getMessageBody(),
new CorrelationData(ordersMessage.getId() + ""));
return true;
}
return false;
}
}
定时任务补偿(时间频率可以设长一些):
package com.tang.order.schedule;
import com.tang.order.domain.OrdersMessage;
import com.tang.order.mapper.OrdersMessageMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import tk.mybatis.mapper.entity.Example;
import javax.annotation.Resource;
import java.util.List;
/**
*
* 定时任务补偿,频次不要太高,主要是为了补偿发 MQ 失败的
*/
@Slf4j
@Component
public class MqSchedule{
@Resource
private OrdersMessageMapper ordersMessageMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0/2 * * * * ?")
public void scheduleScanLocalMessage() {
log.info("扫描消息表开始...."+System.currentTimeMillis());
// 查询所有等待发送的消息
Example example = new Example(OrdersMessage.class);
example.createCriteria().andEqualTo("status", 0);
List<OrdersMessage> ordersMessages = ordersMessageMapper.selectByExample(example);
ordersMessages.forEach(item -> {
log.info("开始发送本地消息表里面的数据.........");
// 注册回调函数
this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
log.info("消息是否发送成功--->" + ack);
// 消息发送成功了
if (ack) {
log.info("消息发送成功,更改消息表中的状态....");
// 更改消息表的状态为已发送
String id = correlationData.getId();
OrdersMessage ordersMessage = new OrdersMessage();
ordersMessage.setId(Integer.valueOf(id));
ordersMessage.setStatus(1);
ordersMessageMapper.updateByPrimaryKeySelective(ordersMessage);
}
}
});
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.convertAndSend("stockExchange","direct.key",item.getMessageBody(),
new CorrelationData(item.getId() + ""));
});
}
}
库存(消费方)代码:
package com.tang.stock.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.tang.stock.domain.Stock;
import com.tang.stock.mapper.StockMapper;
import com.tang.stock.utils.MapperUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import tk.mybatis.mapper.entity.Example;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;
/**
* 库存,消费 MQ 消息
*/
@Component
@Slf4j
public class ListenerMessage {
@Resource
private StockMapper stockMapper;
@RabbitListener(bindings = {
@QueueBinding(
exchange = @Exchange(value = "stockExchange"),
value = @Queue(value = "stockQueue"),
key = "direct.key"
)
})
@Transactional(rollbackFor = Exception.class)
public void listenerStock(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel)
throws IOException {
log.info("接收到消息,进行手动签收...接收到的消息内容为----->" + messageBody);
Map map = MapperUtils.JsonToObject(messageBody, Map.class);
String orderId = map.get("orderId").toString();
// 考虑幂等性
Stock stock = new Stock();
stock.setOrderId(orderId);
Example example = new Example(Stock.class);
example.createCriteria()
.andEqualTo("orderId", orderId);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 重复消费问题
if (stockMapper.selectCountByExample(example) > 0) {
log.info("重复消费触发了....");
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
return;
}
stock.setStock(100);
// 新增一条减库存记录
int flag = stockMapper.insert(stock);
// if (flag == 1) {
// throw new RuntimeException("手动错误开启....");
// }
if (flag == 1) {
log.info("新增一条减库存记录成功.....分布式事务完成....");
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
}
3、结论
很简单,依靠本地事务 + 消息表。包括 TCC 也是需要依靠本地事务,所以分布式事务不可能脱离本地事务而存在,毕竟各个微服务系统都是依靠本地事务的。
我还想说明的是,本地消息表使用有一定的局限性,它只适合多个数据库操作有联系的情况下来做,比如一个事务需要操作
网友评论