目录
- 设计重点
- 流程图
- 伪代码
2.1. PublishEvent
2.2. SubscribeEvent
2.3. Publisher
2.4. Subscriber - 微服务 强一致性
3.1 Publisher
3.2 Subscriber - 事件总线 - 跨服务 最终一致性
4.1 Publisher & Subscriber 都开启了本地事务,保证了强一致性
4.2 问题场景一:当 ③ 发布失败怎么办?
4.3 问题场景二:当 ③ 发布成功,但 ④ 更新事件状态失败怎么办?
4.4 问题场景三:Publisher 端Ok,Subscriber 消费出错
0. 设计重点
- Publisher 本地化 PublishEvent 保证事件发布可靠性
- Subscriber 本地化 SubscribeEvent 保证事件订阅可靠性
- SubscribeEvent 通过 EventId & HandlerType 组合约束 保证不重复消费事件
- 事件中央控制台 处理 Publisher & Subscriber 事件重试
1. 执行流程图
执行流程.png2. 伪代码
2.1 PublishEvent
public abstract class Event
{
public Event()
{
Id = Guid.NewGuid();
CreationTime = DateTime.UtcNow;
}
public Guid Id { get; set; }
public DateTime CreationTime { get; set; }
}
public class PublishEvent : Event
{
public PublishEvent(Event @event)
{
Id = @event.Id;
CreationTime = @event.CreationTime;
Type = @event.GetType().FullName;
Data = JsonConvert.SerializeObject(@event);
Status = PublishEventStatus.NotPublished;
}
public String Type { get; set; }
public String Data { get; set; }
public PublishEventStatus Status { get; set; }
}
public enum PublishEventStatus
{
NotPublished = 0,
Published = 1,
PublishedFailed = 2
}
2.2 SubscribeEvent
public class SubscribeEvent
{
public SubscribeEvent(Event @event, IEventHandler handler)
{
EventId = @event.Id;
EventCreationTime = @event.CreationTime;
EventType = @event.GetType().FullName;
EventData = JsonConvert.SerializeObject(@event);
HandlerType = handler.GetType().FullName;
HandlingStatus = HandlingStatus.HandleSucceeded;
HandlingTime = DateTime.Now;
}
public Guid EventId { get; set; }
public String EventType { get; set; }
public String EventData { get; set; }
public DateTime EventCreationTime { get; set; }
public String HandlerType { get; set; }
public DateTime HandlingTime { get; set; }
public HandlingStatus HandlingStatus { get; set; }
}
public enum HandlingStatus
{
HandleSucceeded = 0,
HandleFailed = 1
}
2.3 Publisher
try
{
BeginTransaction(); // ①
//Biz Flow
EventRepository.PubilshEvent(@event);// ②
CommitTransaction();
}
catch(Exception ex){
RollbackTransaction();
throw ex;
}
EventBus.Publish(@event); // ③
EventResitory.EventPublished(@event.ToString()); // ④
2.4 Subscriber
try
{
BeginTransaction();
//Biz Flow
EventRepository.SubscribeEvent(@event , eventHandler); // ⑤
CommitTransaction();
}
catch(Exception ex){
RollbackTransaction();
throw ex;
}
3. 微服务 强一致性
3.1 Publisher
- 开启本地事务达到强一致性
- 执行本地业务代码
- 本地事务内部保存事件 预发布 状态 ②
- 发布事件到事件总线 ③
- 修改事件发布状态为已发布 ④
3.2 Subscriber
- 开启本地事务达到强一致性
- 执行本地业务代码
- 保存订阅事件到本地仓库
4 事件总线 - 跨服务 最终一致性
4.1 Publisher & Subscriber 都开启了本地事务,保证了强一致性
4.2 问题场景一:当 ③ 发布失败怎么办?
- ③ 发布失败,意味着抛出异常,则 ④ 不执行,那么事件状态依然保持 预发布状态
- 后续 事件重试 重新发布该事件,并更新事件状态为 已发布
4.3 问题场景二:当 ③ 发布成功,但 ④ 更新事件状态失败怎么办?
4.3.1 场景二·一 Subscriber 订阅成功
- ③ 发布成功,但 ④ 更新事件状态失败,事件状态依然是 预发布状态
- Subscriber 订阅到该事件后成功执行完业务代码
-
Subscriber 将订阅事件保存到本地订阅事件仓库 ⑤
该场景存在的问题: Publisher 会通过 事件重试 再次发布 预发布 状态的事件,那么此时Subscriber 将重复消费该事件
方案:该问题我们可以通过将 SubscribeEvent EventId & HandlerType 组合唯一约束,来避免重复消费
4.3.2 场景二·二 Subscriber 订阅失败
- ③ 发布成功,但 ④ 更新事件状态失败,事件状态依然是 预发布状态
- Subscriber 执行消费失败
-
Subscriber 回滚本地事务
该场景不存在任何问题,因为 Publisher 会通过 事件重试 再次发布 预发布 状态的事件 。
4.4 问题场景三:Publisher 端Ok,Subscriber 消费出错
- Publisher 端处理顺利
-
Subscriber 消费失败,回滚本地事务,此时 SubscribeEvent 未存储到本地仓库
该场景存在的问题:
Publisher 发送成功,并且本地 PublishEvent 事件为已发布,那么意味着从Publisher端是无法知道Subscriber消费失败需要重新消费
解决方案: - 通过检测 PublishEvent & SubscribeEvent 获得需要 事件重试 的 PublishEvent
- 将 PublishEvent 重新发布 到 Subscriber
5. 通过Nuget安装组件支持以上编程模型
Install-Package SmartEventBus.RabbitMQImpl
Install-Package SmartEventBus.Repository
6. ORM:SmartSql 广而告之
SmartSql = Dapper + MyBatis + Cache(Memory | Redis) + ZooKeeper + R/W Splitting + ......
网友评论