Grails本身提供了一个异步支持的插件grails-async
,提供异步特性。最近正好手头某个项目需要异步支持,自然而然想到这个插件,但是遇到了不少坑,特地写此文记录下来,以便以后查阅。
插件使用
插件使用上没有太大问题,比较简单,参考官方文档即可。在grails的build.gradle
配置文件中加入grails-async的依赖即可。
dependencies {
runtime "org.grails.plugins:async"
// 需要用到eventbus才需要引入这个依赖
runtime "org.grails.plugins:events"
}
可以不加版本号,因为在grails的buildscript
的classpath已经引入了grails的maven bom,会自动根据当前使用的Grails版本选择合适的版本号,grails自带的插件基本都满足这个套路,这一点可以在脚手架生成的build.gradle
看出来。
Async
插件主要提供了两个异步支持的功能——Promise
和Event
。基本上用过异步框架的开发者对这两种模型都不陌生,具体使用方法就不说了,官方文档代码一看就懂,说几个主要需要注意的知识点吧。
Promise和Event的实现
Grails的Promise
默认的实现方式是 CachedThreadPoolPromiseFactory,其实就是Java自带的Thread Pool(java.util.concurrent.Executors.newCachedThreadPool()
),同时还支持的实现方式有GPars
和RxJava
,只要在runtime
依赖级别中添加对应的依赖项就会自动使用对应的实现方式。Event
也一样,默认使用Promise
的默认实现方式,即 CachedThreadPoolPromiseFactory,同样也支持GPars
和RxJava
,切换实现的方法也一样,不再赘述。
关于测试
这个官方文档只用了一小段话简单带过了,后面会说到还有坑的。
为了便于测试,Grails Async插件提供了一个同步实现的方式org.grails.async.factory.SynchronousPromiseFactory
,该类会将异步类库进行阻塞,从而转换为同步方式,这样就可以在测试中比较容易写出测试代码了,异步的测试一直都是一个难点。
import org.grails.async.factory.*
import grails.async.*
Promises.promiseFactory = new SynchronousPromiseFactory()
为了在自动化测试中满足每个场景都是一个干净的测试环境的原则,也可以保存原来的Promise实现,以便测试完毕后还原环境:
// spock测试框架中的部分片段
void 'such a method within promise'() {
setup:
PromiseFactory old = Promises.promiseFactory
Promises.promiseFactory = new SynchronousPromiseFactory()
... SNIP ...
when:
... SNIP ...
then:
... SNIP ...
cleanup:
Promises.promiseFactory = old
}
基本上使用时需要注意的也就这么多了,这个插件总体来说使用上还是没什么难度的。接下来说说项目中遇到的坑了。
Async插件的坑——Hibernate Session
这个严格来说也不算是坑,只不过官方文档对这个问题提之甚少,以致于可能让上手的人忽略了这个问题,造成浪费大量的时间去填坑。接下来我们举一个简单的例子说明这个问题。先来看看一个最基本的Promise使用:
Promise task = Promises.task {
// 期望异步执行的代码块
}
task.onComplete { List result ->
// 成功的回调内容,可以根据result.size()判断成功了多少
}
task.onError { Throwable t->
// 错误的回调,在这里可以处理task中的异常,不需要异常处理可以不要这个block
}
乍一看没什么,几乎所有的异步编程框架都满足这个套路。可是Grails的异步处理中如果包含了Domain的写入或修改时,就很容易踩到坑了,这个坑官方文档中却提之甚少,甚至连解决方案都言语不详。
我们接下来看一个实际的例子:
class MyService {
@Transactional
void myMethod(List<Domain> domainList, Logrecord record) {
PromiseList<Domain> tasks = new PromiseList<>()
domainList.each { Domain domain ->
tasks << { modifyDomain(domain) }
}
tasks.onComplete { List<Domain> result ->
record.message = "Domain list size {result.size()} finished."
record.save()
}
}
@Transactional
Domain modifyDomain(Domain domain) {
// 一些修改动作
domain.save()
}
}
Promises.task中的坑
我们期望对List<Domain>
进行异步并行处理,将每个domain修改过的结果写回数据库,所有任务完成之后我们希望在Logrecord
中记录一笔。发现这其中的问题了么?
Grails对Hibernate封装的太好了,以致于大部分情况下让人忽视了Hibernate Session的存在。Domain要想修改,必须工作在Hibernate的Persistence Session之下,而Hibernate Session是thread local绑定的,不在线程中共享。
所以这里的由于Promise的task
部分和onComplete
会在新的线程执行,所以根本没有工作在当前Hibernate Session环境下,以致于执行到domain.save()
这一步的时候方法会卡住,onComplete
方法永远也得不到执行。在官方文档一处不起眼的地方发现了解决方案:
tasks << {
Domain.withNewSession {
modifyDomain(domain.attach())
}
}
其实这里已经是踩了半天坑之后才搞定的了,直接把domain
传进去是不行的(modifyDomain(domain)
这种写法是不行的),即使加了withNewSession
你会发现数据也没有持久化到数据库。加上hibernate的sql打印会发现虽然方法都执行了,onComplete方法能调用,但是save之后并没有期望中的update
语句产生,这又是为什么呢?
还是那个容易被忽略的Hibernate Session问题,domain
这个对象工作在先前的Hibernate Session中,自然直接save不了(用明朝的剑斩清朝的官??)。那么此时肯定有不少人都想到应对策略了,只要我想办法刷新一下Session不就可以了吗?于是我评估了以下几种方式
session.get()
session.load()
domain.refresh()
....
经过我的评估,其实session.merge
比较合适,这样会减少数据库的读取,而Grails提供了attach()方法,效果是一样的(注意Grails的domain.merge()方法和hibernate的session.merge()方法方向是反的,只有domain.attach()才是hibernate的session.merge()的行为),换成session.merge(domain)
也没问题。这样重新拿到了当前的Persistence Session,于是modifyDomain(domain)
这个方法得以顺利执行。
Promises.onComplete中的坑
但是对于onComplete
来说,更坑的来了,由于onComplete
这里执行的是结束回调,到这里之前的session已经end了,根本无法通过session.merge()
或者domain.attach()
接管到当前的Session中,导致onComplete卡住。如果你不想重新读取数据库获取新的Session(session.get()或者Domain.get(),其实是一样的),基本只能通过HQL做了:
tasks.onComplete { List<Domain> result ->
Logrecord.withNewTransaction {
Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished.", recordId: record.id])
}
}
由于之前的session早已end,我们只能通过withNewSession
或withNewTransaction
方法重新开启一个新的Hibernate Session完成执行HQL的操作,不过这里就没有乐观锁了,想要实现乐观锁就得自己加上version
的where条件,不如save()
方法那么方便了。如果期望直接利用GORM做,我目前没有找到太好的不用重新读库的方法(Domian.get()会重新读库)。
Lazy loading卡住
Grails默认的ORM模式是lazy loading,即domain中有其他domian的关联,那么不是一开始就fetch过来,而是读取对应的属性的时候才会触发数据库select操作。还是上面的问题,如果在modifyDomain(Domain domain)
方法中有lazy loading的行为:
Domain modifyDomain(Domain domain) {
if (domain.otherDomain.prop == 'some prop') {
// 一些逻辑处理
}
domain.save()
}
执行到lazy loading就会卡住。解决方案其实有两种,一种是将fetch方式由Lazy fetch改为Eager fetch:
static mapping = {
otherDomain lazy: false
}
另外一种解决方案是在task闭包之外手工fetch一次,这样这个otherDomain就会进入session:
String otherDomainProp = domain.otherDomain.prop
tasks << {
Domain.withNewSession {
modifyDomain(domain.attach())
}
}
SynchronousPromiseFactory的坑
当我们自认为解决了Promise的坑,兴奋的去跑自动化测试的时候,发现之前的异步测试全挂了。打开日志,发现了Hibernate抛出了这样的异常: row was deleted or updated by another transaction.
,简直要晕掉了。
稍微一想就很容易明白原因,由于SynchronousPromiseFactory
会阻塞异步实现,转变为同步实现以便于测试。所以等于我们的task
和onPromise
全是同步代码了,这样一来等于原来的session还没有end呢,我们又新开了一个session去修改了domain的内容,于是hibernate就报错了。
这里我还没想到其他解决方案,所以只好写了一段判断:
@Transactional
void myMethod(List<Domain> domainList, Logrecord record) {
PromiseList<Domain> tasks = new PromiseList<>()
domainList.each { Domain domain ->
tasks << {
if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
modifyDomain(domain)
} else {
Domain.withNewSession {
modifyDomain(domain.attach())
}
}
}
}
tasks.onComplete { List<Domain> result ->
if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
record.message = "Domain list size {result.size()} finished."
record.save()
} else {
Logrecord.withNewTransaction {
Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished.", recordId: record.id])
}
}
}
}
判断当前的Promise实现是不是同步实现,如果是同步实现就不开启新的session,避免Hibernate的报错。于是测试终于通过了。
结语
Grails的Async插件使用确实非常方便,让人感觉上手很容易。实际体验之后发现这里的异步Session问题是一个非常大的坑,官方文档却提得非常少。这些解决方案差不多花了我一整天的时间研究,觉得值得记录下来以便以后翻阅,隧成此文。
网友评论