美文网首页DTeam团队日志
Grails Async踩坑记——Hibernate Sessi

Grails Async踩坑记——Hibernate Sessi

作者: 冯宇Ops | 来源:发表于2018-09-02 01:12 被阅读47次

    Grails本身提供了一个异步支持的插件grails-async,提供异步特性。最近正好手头某个项目需要异步支持,自然而然想到这个插件,但是遇到了不少坑,特地写此文记录下来,以便以后查阅。

    插件使用

    插件使用上没有太大问题,比较简单,参考官方文档即可。在grails的build.gradle配置文件中加入grails-async的依赖即可。

    dependencies {
        runtime "org.grails.plugins:async"
        // 需要用到eventbus才需要引入这个依赖
        runtime "org.grails.plugins:events"
    }
    

    可以不加版本号,因为在grails的buildscript的classpath已经引入了grails的maven bom,会自动根据当前使用的Grails版本选择合适的版本号,grails自带的插件基本都满足这个套路,这一点可以在脚手架生成的build.gradle看出来。

    Async插件主要提供了两个异步支持的功能——PromiseEvent。基本上用过异步框架的开发者对这两种模型都不陌生,具体使用方法就不说了,官方文档代码一看就懂,说几个主要需要注意的知识点吧。

    Promise和Event的实现

    Grails的Promise默认的实现方式是 CachedThreadPoolPromiseFactory,其实就是Java自带的Thread Pool(java.util.concurrent.Executors.newCachedThreadPool()),同时还支持的实现方式有GParsRxJava,只要在runtime依赖级别中添加对应的依赖项就会自动使用对应的实现方式。Event也一样,默认使用Promise的默认实现方式,即 CachedThreadPoolPromiseFactory,同样也支持GParsRxJava,切换实现的方法也一样,不再赘述。

    关于测试

    这个官方文档只用了一小段话简单带过了,后面会说到还有坑的。

    为了便于测试,Grails Async插件提供了一个同步实现的方式org.grails.async.factory.SynchronousPromiseFactory,该类会将异步类库进行阻塞,从而转换为同步方式,这样就可以在测试中比较容易写出测试代码了,异步的测试一直都是一个难点。

    import org.grails.async.factory.*
    import grails.async.*
    
    Promises.promiseFactory = new SynchronousPromiseFactory()
    

    为了在自动化测试中满足每个场景都是一个干净的测试环境的原则,也可以保存原来的Promise实现,以便测试完毕后还原环境:

    // spock测试框架中的部分片段
    void 'such a method within promise'() {
        setup:
        PromiseFactory old = Promises.promiseFactory
        Promises.promiseFactory = new SynchronousPromiseFactory()
        ... SNIP ...
        when:
        ... SNIP ...
        then:
        ... SNIP ...
        cleanup:
        Promises.promiseFactory = old
    }
    

    基本上使用时需要注意的也就这么多了,这个插件总体来说使用上还是没什么难度的。接下来说说项目中遇到的坑了。

    Async插件的坑——Hibernate Session

    这个严格来说也不算是坑,只不过官方文档对这个问题提之甚少,以致于可能让上手的人忽略了这个问题,造成浪费大量的时间去填坑。接下来我们举一个简单的例子说明这个问题。先来看看一个最基本的Promise使用:

    Promise task = Promises.task {
        // 期望异步执行的代码块
    }
    
    task.onComplete { List result ->
        // 成功的回调内容,可以根据result.size()判断成功了多少
    }
    
    task.onError { Throwable t->
        // 错误的回调,在这里可以处理task中的异常,不需要异常处理可以不要这个block
    }
    

    乍一看没什么,几乎所有的异步编程框架都满足这个套路。可是Grails的异步处理中如果包含了Domain的写入或修改时,就很容易踩到坑了,这个坑官方文档中却提之甚少,甚至连解决方案都言语不详。

    我们接下来看一个实际的例子:

    class MyService {
        @Transactional
        void myMethod(List<Domain> domainList, Logrecord record) {
            PromiseList<Domain> tasks = new PromiseList<>()
            domainList.each { Domain domain ->
                tasks << { modifyDomain(domain) }
            }
    
            tasks.onComplete { List<Domain> result ->
                record.message = "Domain list size {result.size()} finished."
                record.save()
            }
        }
    
        @Transactional
        Domain modifyDomain(Domain domain) {
            // 一些修改动作
            domain.save()
        }
    }
    

    Promises.task中的坑

    我们期望对List<Domain>进行异步并行处理,将每个domain修改过的结果写回数据库,所有任务完成之后我们希望在Logrecord中记录一笔。发现这其中的问题了么?

    Grails对Hibernate封装的太好了,以致于大部分情况下让人忽视了Hibernate Session的存在。Domain要想修改,必须工作在Hibernate的Persistence Session之下,而Hibernate Session是thread local绑定的,不在线程中共享。

    所以这里的由于Promise的task部分和onComplete会在新的线程执行,所以根本没有工作在当前Hibernate Session环境下,以致于执行到domain.save()这一步的时候方法会卡住,onComplete方法永远也得不到执行。在官方文档一处不起眼的地方发现了解决方案

    tasks << {
        Domain.withNewSession {
            modifyDomain(domain.attach())
        }
    }
    

    其实这里已经是踩了半天坑之后才搞定的了,直接把domain传进去是不行的(modifyDomain(domain)这种写法是不行的),即使加了withNewSession你会发现数据也没有持久化到数据库。加上hibernate的sql打印会发现虽然方法都执行了,onComplete方法能调用,但是save之后并没有期望中的update语句产生,这又是为什么呢?

    还是那个容易被忽略的Hibernate Session问题,domain这个对象工作在先前的Hibernate Session中,自然直接save不了(用明朝的剑斩清朝的官??)。那么此时肯定有不少人都想到应对策略了,只要我想办法刷新一下Session不就可以了吗?于是我评估了以下几种方式

    session.get()
    session.load()
    domain.refresh()
    ....
    

    经过我的评估,其实session.merge比较合适,这样会减少数据库的读取,而Grails提供了attach()方法,效果是一样的(注意Grails的domain.merge()方法和hibernate的session.merge()方法方向是反的,只有domain.attach()才是hibernate的session.merge()的行为),换成session.merge(domain)也没问题。这样重新拿到了当前的Persistence Session,于是modifyDomain(domain)这个方法得以顺利执行。

    Promises.onComplete中的坑

    但是对于onComplete来说,更坑的来了,由于onComplete这里执行的是结束回调,到这里之前的session已经end了,根本无法通过session.merge()或者domain.attach()接管到当前的Session中,导致onComplete卡住。如果你不想重新读取数据库获取新的Session(session.get()或者Domain.get(),其实是一样的),基本只能通过HQL做了:

            tasks.onComplete { List<Domain> result ->
                Logrecord.withNewTransaction {
                    Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished.", recordId: record.id])
                }
            }
    

    由于之前的session早已end,我们只能通过withNewSessionwithNewTransaction方法重新开启一个新的Hibernate Session完成执行HQL的操作,不过这里就没有乐观锁了,想要实现乐观锁就得自己加上version的where条件,不如save()方法那么方便了。如果期望直接利用GORM做,我目前没有找到太好的不用重新读库的方法(Domian.get()会重新读库)。

    Lazy loading卡住

    Grails默认的ORM模式是lazy loading,即domain中有其他domian的关联,那么不是一开始就fetch过来,而是读取对应的属性的时候才会触发数据库select操作。还是上面的问题,如果在modifyDomain(Domain domain)方法中有lazy loading的行为:

    Domain modifyDomain(Domain domain) {
        if (domain.otherDomain.prop == 'some prop') {
            // 一些逻辑处理
        }
    
        domain.save() 
    }
    

    执行到lazy loading就会卡住。解决方案其实有两种,一种是将fetch方式由Lazy fetch改为Eager fetch:

        static mapping = {
            otherDomain lazy: false
        }
    

    另外一种解决方案是在task闭包之外手工fetch一次,这样这个otherDomain就会进入session:

    String otherDomainProp = domain.otherDomain.prop
    tasks << {
        Domain.withNewSession {
            modifyDomain(domain.attach())
        }
    }
    

    SynchronousPromiseFactory的坑

    当我们自认为解决了Promise的坑,兴奋的去跑自动化测试的时候,发现之前的异步测试全挂了。打开日志,发现了Hibernate抛出了这样的异常: row was deleted or updated by another transaction.,简直要晕掉了。

    稍微一想就很容易明白原因,由于SynchronousPromiseFactory会阻塞异步实现,转变为同步实现以便于测试。所以等于我们的taskonPromise全是同步代码了,这样一来等于原来的session还没有end呢,我们又新开了一个session去修改了domain的内容,于是hibernate就报错了。

    这里我还没想到其他解决方案,所以只好写了一段判断:

        @Transactional
        void myMethod(List<Domain> domainList, Logrecord record) {
            PromiseList<Domain> tasks = new PromiseList<>()
            domainList.each { Domain domain ->
                tasks << {
                    if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
                        modifyDomain(domain)
                    } else {
                        Domain.withNewSession {
                            modifyDomain(domain.attach())
                        }
                    }
                }
            }
    
            tasks.onComplete { List<Domain> result ->
                if (Promises.promiseFactory instanceof SynchronousPromiseFactory) {
                    record.message = "Domain list size {result.size()} finished."
                    record.save()
                } else {
                    Logrecord.withNewTransaction {
                        Logrecord.executeUpdate('UPDATE Logrecord SET message = :message WHERE id = :recordId', [message: "Domain list size {result.size()} finished.", recordId: record.id])
                    }
                }
            }
        }
    

    判断当前的Promise实现是不是同步实现,如果是同步实现就不开启新的session,避免Hibernate的报错。于是测试终于通过了。

    结语

    Grails的Async插件使用确实非常方便,让人感觉上手很容易。实际体验之后发现这里的异步Session问题是一个非常大的坑,官方文档却提得非常少。这些解决方案差不多花了我一整天的时间研究,觉得值得记录下来以便以后翻阅,隧成此文。

    相关文章

      网友评论

        本文标题:Grails Async踩坑记——Hibernate Sessi

        本文链接:https://www.haomeiwen.com/subject/ujdpwftx.html