KillBill框架是一个开源的支付和计费SaaS平台,作为一款分布式的平台,我们今天主要是从它身上学习如何处理分布式事务问题的。
众所周知,采用分布式设计之后我们需要解决几大问题:
- 数据一致性分发问题
- 数据聚合join问题
- 分布式事务问题
解决数据一致性分发有几种方案:
- 事务性发件箱机制(本地消息事务机制)
- CDC技术
数据库聚合join问题依赖于数据一致性分发技术,分库分表之后,数据查询变得非常棘手,要想把数据聚合起来,可以采用下面几种方式:
- 采用全局表方式,就是每个数据库存储一份需要的数据
- 业务设计时做数据冗余,这是一种反设计模式,即在需要的地方冗余一份业务数据从而避免join
- 采用程序聚合的方式,可以采用的技术主要有两种,第一种是分布式代理中间件,比如以ShardingSphere为代表,第二种是CQRS技术
而对于分布式事务的解决方案主要有:
- 2PC(只停留在理论方面)
- TCC
- Saga
在分布式事务解决方面,典型的技术框架有:
KillBill使用的技术介绍
KillBill的使用的技术有
KillBill主要的启动项目是profiles,对于Jersey和Guice框架的整合可以参考这篇文章,整合之后,我们以Accout为例进行原理介绍,AccountResource为账号服务的RESTful服务入口,其中有个CreateAccount方法来进行Accout账号创建,最终通过DefaultAccountDao调用Create方法进行账号创建,KillBill的数据存储底层采用的jdbi技术实现,但是对事务的封装采用AOP技术实现,主要逻辑在EntitySqlDaoTransactionalJdbiWrapper这个类,JDBI技术可以参考这篇文章,KillBill对JDBI进行了扩展,采用模板技术来生成SQL代码,具体可以参考其commons项目的实现。
KillBill自己开发了事务总线,用于分布式事务的消息传递,其主要是用来执行远程消息,其主要是通过DefaultPersistentBus来实现事务消息的消费。
KillBill的服务总线采用微内核架构实现,其主要的实现逻辑在DefaultLifecycle,其主要是找到需要运行的各种服务,然后分阶段调用其每个阶段的方法执行,主要逻辑在KillbillPlatformGuiceListener的startLifecycle方法
protected void startLifecycle() {
this.startLifecycleStage1();
this.killbillLifecycle.fireStartupSequencePriorEventRegistration();
this.startLifecycleStage2();
this.killbillLifecycle.fireStartupSequencePostEventRegistration();
this.startLifecycleStage3();
}
startLifecycleStage2用来服务监听注册,startLifecycleStage3启动服务,同样以DefaultServerService为例,其注册方法是registerForNotifications,启动方法是start
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void registerForNotifications() throws NotificationQueueAlreadyExists {
try {
bus.register(pushNotificationListener);
} catch (final EventBusException e) {
log.warn("Failed to register PushNotificationListener", e);
}
pushNotificationRetryService.initialize();
}
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
public void start() {
pushNotificationRetryService.start();
}
服务总线的主要接口是PersistentBus,我们可以看到其中不但有注册监听的方法,也有发送事件的方法,我们同样以DefaultAccountDao为例,其中的postBusEventFromTransaction方法用来向事务性发件箱发送事件。
@Override
protected void postBusEventFromTransaction(final AccountModelDao account, final AccountModelDao savedAccount, final ChangeType changeType,
final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalCallContext context) throws BillingExceptionBase {
// This is only called for the create call (see update below)
switch (changeType) {
case INSERT:
break;
default:
return;
}
final Long recordId = savedAccount.getRecordId();
// We need to re-hydrate the callcontext with the account record id
final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(savedAccount, recordId, context);
final AccountCreationInternalEvent creationEvent = new DefaultAccountCreationEvent(new DefaultAccountData(savedAccount), savedAccount.getId(),
rehydratedContext.getAccountRecordId(), rehydratedContext.getTenantRecordId(), rehydratedContext.getUserToken());
try {
eventBus.postFromTransaction(creationEvent, entitySqlDaoWrapperFactory.getHandle().getConnection());
} catch (final EventBusException e) {
log.warn("Failed to post account creation event for accountId='{}'", savedAccount.getId(), e);
}
}
Kill Bill基层用的AOP技术分析
底层其使用的是cglib技术,可以从SqlObject找到答案
static <T> T buildSqlObject(final Class<T> sqlObjectType, final HandleDing handle) {
Factory f;
if (factories.containsKey(sqlObjectType)) {
f = (Factory)factories.get(sqlObjectType);
} else {
Enhancer e = new Enhancer();
e.setClassLoader(sqlObjectType.getClassLoader());
List<Class> interfaces = new ArrayList();
interfaces.add(CloseInternalDoNotUseThisClass.class);
if (sqlObjectType.isInterface()) {
interfaces.add(sqlObjectType);
} else {
e.setSuperclass(sqlObjectType);
}
e.setInterfaces((Class[])interfaces.toArray(new Class[interfaces.size()]));
final SqlObject so = new SqlObject(buildHandlersFor(sqlObjectType), handle);
e.setCallback(new MethodInterceptor() {
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
return so.invoke(o, method, objects, methodProxy);
}
});
T t = e.create();
T actual = factories.putIfAbsent(sqlObjectType, (Factory)t);
if (actual == null) {
return t;
}
f = (Factory)actual;
}
final SqlObject so = new SqlObject(buildHandlersFor(sqlObjectType), handle);
return f.newInstance(new MethodInterceptor() {
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
return so.invoke(o, method, objects, methodProxy);
}
});
}
通过代理其底层实际的执行类是BasicHandle,接着调用的是EntitySqlDaoTransactionalJdbiWrapper,最终调用的是AccountSqlDao,具体的调用过程读者可以自己通过搭建调试环境分析。
总体来说,KillBill有很多技术值得我们学习,作为一个10年的平台,同时作为一款有多年的生产实践经验的框架需要读者根据文档和源代码进行更多的挖掘。
网友评论