数据库事务
断电了,该怎么处理?通过日志的方式!在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,那么当突然断电的时候,即使操作没有完成,在重新启动数据库时候,数据库会根据当前数据的情况进行undo回滚或者是redo前滚,这样就保证了数据的强一致性。
CAP定理
- 一致性(Consistency) : 客户端知道一系列的操作都会同时发生(生效);
- 可用性(Availability) : 每个操作都必须以可预期的响应结束;
- 分区容错性(Partition tolerance) : 即使出现单个组件无法可用,操作依然可以完成;
在分布式系统中,在任何数据库设计中,一个Web应用至多只能同时支持上面的两个属性。显然,任何横向扩展策略都要依赖于数据分区。因此,设计人员必须在一致性与可用性之间做出选择。
一般情况下往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,是可以接受短暂的不一致的。
BASE理论
在分布式系统中,我们往往追求的是可用性,它的重要程序比一致性要高,那么如何实现高可用性呢? 前人已经给我们提出来了另外一个理论,就是BASE理论,它是用来对CAP定理进行进一步扩充的。BASE理论指的是:
- Basically Available(基本可用)
- Soft state(软状态)
- Eventually consistent(最终一致性)
BASE理论是对CAP中的一致性和可用性进行一个权衡的结果,理论的核心思想就是:我们无法做到强一致,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。
分布式事务
两段式提交(2PC)
两阶段提交就是使用XA协议的原理:
- 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.
- 第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。在CAP中,影响可用性。
优点: 尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域(其实也不能100%保证强一致)。
缺点: 实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景,如果分布式系统跨接口调用,目前 .NET 界还没有实现方案。
补偿事务(TCC)
CC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:
- Try阶段:主要是对业务系统做检测及资源预留;
- Confirm阶段:主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
- Cancel阶段:主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
举个例子,假入 Bob 要向 Smith 转账,思路大概是:
我们有一个本地方法,里面依次调用
1、首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
2、在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
3、如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。
优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些。
缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。
本地消息表(异步确保)
本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。
消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。
消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。
生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。
优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。
缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
MQ事务消息
有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。
以阿里的 RocketMQ 中间件为例,其思路大致为:
第一阶段Prepared消息,会拿到消息的地址。
第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
Sagas事务模型
该模型其核心思想就是拆分分布式系统中的长事务为多个短事务,或者叫多个本地事务,然后由 Sagas 工作流引擎负责协调,如果整个流程正常结束,那么就算是业务成功完成,如果在这过程中实现失败,那么Sagas工作流引擎就会以相反的顺序调用补偿操作,重新进行业务回滚。
比如我们一次关于购买旅游套餐业务操作涉及到三个操作,他们分别是预定车辆,预定宾馆,预定机票,他们分别属于三个不同的远程接口。可能从我们程序的角度来说他们不属于一个事务,但是从业务角度来说是属于同一个事务的。
案例
以上只是介绍了分布式事务的几种不同解决方案,但上面只是笼统的介绍,每一种方案都有很多的细节,这里以本地消息表这种方式进行详细讲解,并提供案例以便更深入的了解。
假设现在有三个系统:系统A、消息中间件M、系统B,在A 和 B 之间存在分布式事务的需求。
根据分布式事务这篇文章上方案二的理解,大概是这么个流程:
- A向M 发送一条消息,告诉M它准备干活了。
- M向A回应一条消息,告诉A说:我收到你的消息了,你干吧!(并不一定就真的要回应一条消息给A,可以通过判断等方式达到目的)。
- A开始干活,即处理该分布式事务中的A部分业务。这里要分两种情况考虑:
(1) A处理业务的过程中出项异常,干活失败。
(2) A处理业务的过程中表现良好,干活成功。
我们要知道的是,不管是哪种情况,A都需要向M发送一条指令:如果A干活失败,A就Rollback,然后向M发送一条Rollback指令,这时候M就会将这条消息从消息中间件中删除,这种情况就不需要和B打交道了,整个流程就相当于结束了;如果A干活成功,就向M发送一条Confirm指令,可以认为这个指令的作用就是改变消息的状态,比如改成Confirm等,只有消息是这个状态,M才能向B投递消息,这是后话,不多说了。 - 如果A发送的是Confirm指令,M就向B投递该消息,B收到消息后,就开始干活了。
- 如果B干活成功,就向M回应,M这时候可以将这条消息删除或者作废,至此整个分布式事务完成;如果B干活失败,可能就需要调用A的回滚接口,上面没有讨论这种情况,应该挺麻烦吧?
上这种情况,我们都是假设A B 和 M之间不会丢失消息,如果在上面的 3 、5 步骤中发生丢失消息的情况就会出现问题,针对以上情况,有如下解决方案:
-
针对步骤3
当M收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Confirm或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果:
提交:若获得的状态是“提交”,则将该消息投递给系统B。
回滚:若获得的状态是“回滚”,则直接将条消息丢弃。
处理中:若获得的状态是“处理中”,则继续等待。 -
针对步骤5
M在等待确认应答超时之后就会重新向B投递消息,直到B返回消费成功响应为止。当然,一般M可以设置消息重试的次数和时间间隔,比如:当第一次投递失败后,每隔五分钟重试一次,一共重试3次。如果重试3次之后仍然投递失败,那么这条消息就需要人工干预。
准备
系统A、系统B、消息中间件使用RabbitMQ、测试工具Jmeter。
不过这个案例并不是完全按照上面说到的那样,主要区别在于:
- A向M发一条消息,并没有写一条消息到RabbitMQ,而仅仅是向event里面写了一条记录
- 为了防止A向M提交Confirm和Cancle指令时失败,M需要定时去event表里查看哪些消息创建时间大于3s并且还是unfinished状态。然后再根据这些消息的uuid去A的相关业务表查找记录,如果找到了,就置为Confirm,没找到就置为Cancel,然后再根据这两种状态分别进行不同的操作,Cancel就将消息删除;Confirm就发送一条消息到MQ,从而触发系统B的逻辑。
- 当B中的业务代码出现问题时,A并没有提供相应的回滚接口。
消息表
CREATE TABLE `event` (
`uuid` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`status` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态',
`create_time` bigint(20) NOT NULL COMMENT 'event的创建时间',
`publisher_service_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '发布事件的服务id',
`type` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'event的类型',
`json_messages` text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '要发送给消息队列的消息集合',
PRIMARY KEY (`uuid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
系统A
public boolean doA(TradeDto tradeDto) throws Exception {
//1. 生成全局uuid
String uuid = EventTools.generateUuid();
//2. 创建消息
Event event = new Event();
event.setUuid(uuid);
//本服务的服务名
event.setPublisherServiceId("hap-event-demo-trade-service");
//message包含消息队列的队列名和数据
//payload必须含有uuid字段!!!
TradeAmqp payload = new TradeAmqp(uuid, tradeDto.getAmount(), tradeDto.getBuyerId(), tradeDto.getSellerId());
Event.Message message = new Event.Message("trade2", mapper.writeValueAsString(payload));
event.setMessages(Collections.singletonList(message));
event.setType(EVENT_TYPE_TRADE);
// 消息发送之后回调执行业务逻辑
boolean result = eventTemplate.execute(() -> {
TradeRecord record = new TradeRecord();
record.setBuyerId(tradeDto.getBuyerId());
record.setSellerId(tradeDto.getSellerId());
record.setUuid(uuid);
record.setCreateTime(new Timestamp(System.currentTimeMillis()));
if (tradeRecordMapper.insert(record) != 1){
throw new RuntimeException("init event failed");
}
},event);
if (!result){
throw new RuntimeException("error.trade.create");
}
return true;
}
public boolean execute(EventCallback eventCallback, Event event){
// 开启事物
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(def);
String eventId;
// 发送事件创建消息
try {
// 向event表里写一条记录
eventId = eventClient.createEvent(event);
}catch (Exception e){
logger.info("create event failed {}",e);
return false;
}
if(eventId == null){
return false;
}
try {
// 执行具体业务,通过Feign调用M的接口
doSomething(eventCallback);
// 提交事务
transactionManager.commit(status);
} catch (Exception e) {
logger.info("execute failed {}",e);
// 异常回滚
transactionManager.rollback(status);
try{
// 从event表里面删除消息,通过Feign调用M的接口
eventClient.cancelEvent(eventId);
}catch (Exception e1){
logger.info("cancel event failed {}",e1);
}
return false;
}
try{
// 该表event表里的消息状态为confirm,同时向RabbitMQ发送一条消息,通过Feign调用M的接口
eventClient.confirmEvent(eventId);
}catch (Exception e1){
logger.info("confirm event failed {}",e1);
}
return true;
}
// Feign服务调用
@FeignClient(value = "hap-event-store-service")
public interface EventClient {
@PostMapping("/v1/events")
String createEvent(@RequestBody Event event);
@PutMapping("/v1/events/{eventId}/confirm")
void confirmEvent(@PathVariable("eventId") String eventId);
@PutMapping("/v1/events/{eventId}/cancel")
void cancelEvent(@PathVariable("eventId") String eventId);
}
消息中间件M
@Service
@Transactional(noRollbackFor = EventNotExistException.class)
public class EventServiceImpl extends BaseServiceImpl<Event> implements EventService {
// 向event表里写一条消息,该消息的初始状态为 unfinished
public String createEvent(Event event) {
beanTools.setJsonMessages(event);
event.setStatus(Event.STATUS_INIT);
event.setCreateTime(System.currentTimeMillis());
if (insert(event) != 1) {
throw new HapException("error.event.create");
}
return event.getUuid();
}
// 系统A提交事务之后,改变event表里的消息状态为confirm,并向RabbitMQ发送一条消息
public void confirmEvent(String uuid) {
Event event = selectByPrimaryKey(uuid);
if (event == null) {
throw new EventNotExistException(uuid);
}
Event temp = new Event();
temp.setUuid(uuid);
temp.setStatus(Event.STATUS_CONFIRM);
if (updateByPrimaryKeySelective(temp) != 1) {
throw new HapException("error.event.confirm");
}
beanTools.setMessageList(event);
// 这里仅仅是向队列里面插入一个元素,真正向RabbitMQ发送消息的逻辑是通过定时任务完成得
MsgPublishExecuter.getQueue().offer(event);
}
// 从event表删除消息
public void cancelEvent(String uuid) {
if (deleteByPrimaryKey(uuid) != 1) {
if (selectByPrimaryKey(uuid) == null) {
throw new EventNotExistException(uuid);
} else {
throw new HapException("error.event.cancel");
}
}
}
}
有关于MsgPublishExecuter代码
@Component
public class MsgPublishExecuter {
private static final long QUERY_INTERVAL = 100;
// 该队列线程安全,FIFO
private static ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue<>();
private MsgPublishExecTaskImpl msgPublishExecTask;
@Autowired
public MsgPublishExecuter(MsgPublishExecTaskImpl msgPublishExecTask) {
this.msgPublishExecTask = msgPublishExecTask;
}
static ConcurrentLinkedQueue<Event> getQueue() {
return queue;
}
// 每100毫秒执行一次
@Scheduled(fixedDelay = QUERY_INTERVAL)
public void scheduledQueryJob() {
while (!queue.isEmpty()) {
// 发送消息到RabbitMQ,最后通过RabbitTemplate发送
msgPublishExecTask.publishMsg(queue.poll());
}
}
}
有关于向RabbitMQ发送消息和防系统A到M消息丢失措施
@Configuration
@EnableKafka
public class EventConfig {
private static final int MAX_PUBLISH_THREAD_NUM = 2;
private static final int MAX_QUERY_THREAD_NUM = 2;
public static final String PUBLISHER_STATUS_FINISHED = "finished";
public static final String PUBLISHER_STATUS_CANCEL = "canceled";
// 该线程池负责维护 发送消息到MQ的线程
@Bean(name = "publish-executor")
public AsyncTaskExecutor publishTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("publish-executor");
executor.setMaxPoolSize(MAX_PUBLISH_THREAD_NUM);
return executor;
}
// 该线程池负责维护 根据消息向上游系统查询消息状态的线程,主要是从msg_record表查询
@Bean(name = "queryStatus-executor")
public AsyncTaskExecutor queryStatusTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("queryStatus-executor");
executor.setMaxPoolSize(MAX_QUERY_THREAD_NUM);
return executor;
}
}
@EnableBinding
@Component
public class MsgPublishExecTaskImpl implements PublishMsgListener{
private PublishEventService publishEventService;
@Autowired
public MsgPublishExecTaskImpl(PublishEventService publishEventService) {
this.publishEventService = publishEventService;
}
// 异步执行,publish-executor是一个AsyncTaskExecutor对象
@Async("publish-executor")
public void publishMsg(Event event) {
publishEventService.publishMsg (event,this);
}
@Override
public void onFailure(Event event) {
MsgPublishExecuter.getQueue().offer(event);
}
}
// 这个其实就相当于是上面提到的补救措施,即系统A向M发送消息Confirm和Cancel指令失败的情况
@Async("queryStatus-executor")
void execQuery(Event event) {
StatusQueryExecuter.eventNum.decrementAndGet();
logger.info("execQuery event: {}", event);
EventRecord result = null;
try {
StatusQuery query= HystrixFeign
.builder()
.client(client)
.decoder(new JsonDecoder())
.target(StatusQuery.class, "http://" + event.getPublisherServiceId());
result = query.getEventRecord(event.getUuid(), event.getType());
} catch (Exception e) {
logger.warn("execQuery error, event.publisherServiceId: {} event.uuid: {} event.type: {}",
event.getPublisherServiceId(), event.getUuid(), event.getType());
}
if (result == null) {
return;
}
logger.info("execQuery result: {}", result);
if (EventConfig.PUBLISHER_STATUS_FINISHED.equals(result.getStatus())) {
eventService.confirmEvent(event.getUuid());
}
if (EventConfig.PUBLISHER_STATUS_CANCEL.equals(result.getStatus())) {
eventService.cancelEvent(event.getUuid());
}
}
系统B
系统B主要是监听RabbitMQ中的消息,要做的也很简单, 每条消息中包含了买方和卖方的用户ID、金额,每次交易向买方账户见钱、卖方账户加钱
@Topic(value = "trade2", retryTimes = 5, retryInterval = 10000)
@Override
public void receive(String msg) throws Exception{
logger.info("=== 消息队列接收信息 : {}", msg);
TradeAmqp tradeAmqp = mapper.readValue(msg, TradeAmqp.class);
User seller = userMapper.selectByPrimaryKey(tradeAmqp.getSellerId());
seller.setAccount(seller.getAccount() + tradeAmqp.getTradeAmount());
logger.info("=== 用户 : {} 账户增加 : {}", seller.getName(), tradeAmqp.getTradeAmount());
userMapper.updateByPrimaryKeySelective(seller);
User buyer = userMapper.selectByPrimaryKey(tradeAmqp.getBuyerId());
buyer.setAccount(buyer.getAccount() - tradeAmqp.getTradeAmount());
logger.info("=== 用户 : {} 账户减少 : {}", buyer.getName(), tradeAmqp.getTradeAmount());
userMapper.updateByPrimaryKeySelective(buyer);
logger.info("AtomicInteger {}", a.incrementAndGet());
}
测试
在测试前,先看看表里面的初始化信息
消息表 系统A业务表 系统B业务表
从上图中可以看出,初始化的时候系统B中买方和卖方的账户余额都是0。
接下来通过Jmeter进行测试:
单线程测试
只开一个线程,循环执行1000次请求,查看结果
单线程测试 RabbitMQ
但系统A后台报错了,太长了,所以删掉了一些。从日志中可以看出,好像在服务间调用的时候,即系统A调用系统M接口,向消息表写数据的时候,好像出现了问题!
2018-08-26 16:39:54.377 INFO [hap-event-demo-trade-service,6dda6aa15fe370c8,6dda6aa15fe370c8,false] 10044 --- [nio-9021-exec-6] com.hand.hap.cloud.event.EventTemplate : confirm event failed {}
com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
Caused by: java.util.concurrent.TimeoutException: null
at com.netflix.hystrix.AbstractCommand.handleTimeoutViaFallback(AbstractCommand.java:997) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand.access$500(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:609) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$12.call(AbstractCommand.java:601) ~[hystrix-core-1.5.10.jar:1.5.10]
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
... 16 common frames omitted
2018-08-26 16:39:55.393 INFO [hap-event-demo-trade-service,3e17e98019352cb8,3e17e98019352cb8,false] 10044 --- [nio-9021-exec-9] com.hand.hap.cloud.event.EventTemplate : create event failed {}
注意上面两条非常重要的日志:confirm event failed {}、create event failed {} ,都是调用系统M接口时出了问题。这时候再观察数据库里面的记录
SELECT count(0) FROM event; -- 1000条
SELECT count(0) FROM trade_record; -- 913条
SELECT * FROM user;
1 xiaohong 29970
2 xiaoming -29970
从上面我们可以发现两个问题
- 消息表有1000条记录,系统A却只有913条记录,正确的结果应该是系统A和消息表都是1000条记录。
- 从user表中的账户余额来看,1000次请求,每次30,正确的结果应该是30000 和 -30000,结果却少了30,也就是少了一次计算。
下面来分析来分析一下这两个问题:
- 针对对1个问题:
try {
eventId = eventClient.createEvent(event);
}catch (Exception e){
logger.info("create event failed {}",e);
return false;
}
消息表里面的记录是1000,说明系统M的针对系统A发过来的每个创建消息的请求都执行成功了;系统A少了87条记录,说明系统M在成功插入消息后将结果返回给系统A的时候出问题了,原因可能就是日志上显示的那样,timed-out and no fallback available,导致系统A接下来的逻辑不能正常执行,结果A丢了一些记录。
-
针对对2个问题:
消息表里有1000条记录,每次30,正确结果应该是30000和-30000,实际上却是是29970和-29970,买方和卖方的钱对不上,少计算了一次。买卖双方账户余额完全是由系统B的业务逻辑控制,而触发系统B执行该段业务逻辑的条件是系统M中的消息状态是confirm,现在系统M有1000条记录,系统B却只计算了999次,并且观察系统B的后台日志并没有异常发生,说明有可能系统 M中存在一条消息的状态值不是confirm
SELECT * FROM event WHERE status != 'confirm';
256012c6df01473692ae92f7f058e8a9 unfinished 1535272794404 hap-event-demo-trade-service trade [{"topicName":"trade2","payload":"{\"uuid\":\"256012c6df01473692ae92f7f058e8a9\",\"tradeAmount\":30.0,\"buyerId\":2,\"sellerId\":1}"}]
从上图中可以看出,确实有这么一条消息,它的状态为unfinished,说明系统M只向RabbitMQ发送了999条消息,因为只有状态为confirm的消息记录才会publish到RabbitMQ,从而导致系统B只消费了999条消息。而之所以系统M只向RabbitMQ发送999条消息,主要原因在于系统A在向系统M发送Confirm指令的时候出了问题,导致M中的消息状态不能正常变为confirm,从而导致M无法向RabbitMQ推送消息。
经过这么一个简单测试,我们就发现了这个系统存在非常大的漏洞,这还仅仅是在单线程情况下。如果你看的仔细一些,就会发现一些问题,系统A在向系统M发送Confirm指令的时候出了问题,这不是我们上面考虑到的一种情况吗?消息丢失本来就可能发生,这时M根据A提供的接口,在系统A找到记录然后将M中的那条消息记录更新为confirm不就可以吗?这样想是没有问题的,但问题的关键在于,系统M根据那条状态为unfinished消息的uuid去系统A里面找,根本找不到啊!不信我们可以看看系统M中的后台日志,每5s一次的查询:
可以看到,日志中提示消息丢失了,所以问题的关键还是在于系统A,我觉得 在往系统M插一条消息和往系统A里面插入一条记录这两个操作应该要在一个事务里面完成,但现在A和M是两个系统,这又涉及到分布式事务的问题,所以是否可以将消息表放到系统A中?然后这两个操作在一个事务中提交。
实际情况可能会比这复杂很多吧,回到上面的报错日志:
com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) timed-out and no fallback available.
这个错误可能是因为服务调用超时导致的,基本是出现在Hystrix熔断器,在系统M中添加了以下的配置,不过这一块我也并不是很清楚
ribbon:
ReadTimeout: 60000
ConnectTimeout: 60000
hystrix:
command:
default:
execution:
timeout:
enabled: false
isolation:
thread:
timeoutInMilliseconds: 10000
修改之后,重修做了几次测:1000、3000、5000,还是照样会有这个问题,todo。
多线程测试
这里开50个线程,每个线程循环请求20次
多线程测试
结果好像惨不忍睹,大量的 Hystrix circuit short-circuited and is OPEN
Caused by: java.lang.RuntimeException: Hystrix circuit short-circuited and is OPEN
at com.netflix.hystrix.AbstractCommand.handleShortCircuitViaFallback(AbstractCommand.java:979) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand.applyHystrixSemantics(AbstractCommand.java:557) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand.access$200(AbstractCommand.java:60) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:419) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$4.call(AbstractCommand.java:413) ~[hystrix-core-1.5.10.jar:1.5.10]
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46) ~[rxjava-1.1.10.jar:1.1.10]
... 90 common frames omitted
在网上找了一下资料记一次feign的问题排查,引起这个问题的原因是在一个滚动窗口内,失败了二十个(默认),就会发生短路,短路时间默认为5秒,5秒之内拒绝所有的请求,之后开始运行
解决办法:设置熔断器失败的个数,默认为20个,这里我给了1000个,只有超过1000个才会发生短路
hystrix.command.default.circuitBreaker.requestVolumeThreshold=1000
配置之后,重新测试,发现又报这种错
com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#createEvent(Event) could not be queued for execution and no fallback available
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3931cca2 rejected from java.util.concurrent.ThreadPoolExecutor@13ed04ed[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 116]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_101]
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_101]
不过这个RejectedExecutionException好像是线程池里面的抛出的一个异常,后面那些就是线程池的参数。
Fegin默认的核心线程数是10个,默认的阻塞队列个数maxQueueSize是-1,这里我们开了50个线程,它的最大线程数应该是小于50,所以会抛RejectedExecutionException,添加以下配置
hystrix.threadpool.default.coreSize=50
但是我这里有个疑问,根据线程池的特点,在coreSize不够的情况下,不是可以放到阻塞队列吗?但是我在这里配置阻塞队列的个数好像作用不大,例如,我以下的配置是没用的,还是会报错
hystrix.threadpool.default.coreSize=30
hystrix.threadpool.default.maxQueueSize=20
发现又会报以下错:
com.netflix.hystrix.exception.HystrixRuntimeException: EventClient#confirmEvent(String) could not be queued for execution and no fallback available.
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) ~[hystrix-core-1.5.10.jar:1.5.10]
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) ~[rxjava-1.1.10.jar:1.1.10]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) ~[rxjava-1.1.10.jar:1.1.10]
at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) ~[hystrix-core-1.5.10.jar:1.5.10]
at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) ~[hystrix-core-1.5.10.jar:1.5.10]
Caused by: java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.
这时候再添加一个配置,表示即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝
hystrix.threadpool.default.queueSizeRejectionThreshold = 50
所以我最终的配置如下:
ribbon:
ReadTimeout: 60000
ConnectTimeout: 60000
hystrix:
threadpool:
default:
coreSize: 50
# maxQueueSize: 0
queueSizeRejectionThreshold: 50
command:
default:
circuitBreaker:
requestVolumeThreshold: 1000
execution:
timeout:
enabled: false
isolation:
thread:
timeoutInMilliseconds: 10000
计算机配置
50个线程,每个线程循环请求20次
因为这里请求的是系统A,所以图上的5s完成请求仅仅是指系统A完成请求,系统M发送消息到RabbitMQ是一个异步的过程,所以5s仅仅代表系统A完成业务处理,并不代表系统B也完成了业务处理
后台无异常,表数据正确:1000、1000、30000 和 -30000
50个线程,每个线程循环请求60次
后台无异常,表数据正确:4000、4000、120000 和 -120000
50个线程,每个线程循环请求100次
后台无异常,表数据正确:9000、9000、270000 和 -270000
网友评论