滚动查询

作者: OkGogogooo | 来源:发表于2022-06-08 14:19 被阅读0次

    1. 引言

      在微服务场景下,考虑这样一个例子。

    一个程序想调用一个微服务接口,读取并解析一个数据库表的数据,这个数据表有10万条数据。
    

      此时如果用一个微服务接口,将这10万条数据组成一个JSON格式,一次性返回,显然不合适。JSON是一种闭包形式的数据格式,有开始标记,有结束标记,通常是从头到尾解析完才能使用(现在已有库支持边解析,边回调消费,但这不是通常情况下使用JSON的方式)。服务端把这么庞大的数据构造成JSON格式-->网络传输-->客户端再解析,这过程消耗的时间和资源都是非常大的。
      我们想到JDBC是通过迭代器的思想解决这个问题的,即 “fetch -- >迭代消费-->fetch-->迭代消费-->...”。这个思路同样同样可以应用在微服务的场景下,但它是有状态的接口调用,而不是无状态的。所以当后台多实例,中间有负载均衡时,应该以适当的方式告诉负载均衡服务(例如在HTTP请求的header中加一个特殊的头),这一批调用要指向后台同一个服务实例,别分派到多个不同实例。

    2. 滚动查询

      在微服务场景下,基于迭代器思想实现的这种大规模数据查询的方式,我们将之称为“滚动查询”(ScrollQuery)。它的逻辑过程时这样的:


    image.png

    客户端不用主动调用关闭,如果数据获取完,服务端自动会把资源关闭;如果获取了部分就放弃继续获取,则服务端在发现资源超过指定时间没有被使用之后,就会自动关闭。这个idle时间通常是30秒或1分钟。所以滚动查询需要一次性连续使用完。

    3. 构件

    image.png

    实现:

    public class ScrollQuerySite
    {
        static ScrollQuerySite sInstance ;
        
        public static ScrollQuerySite getInstance()
        {
            if(sInstance == null)
                sInstance = new ScrollQuerySite() ;
            
            return sInstance ;
        }
        
        final AutoCleanHashMap<String , IScrollQuery<?>> mResMap = AutoCleanHashMap.withExpired_Idle(1, true) ;
        
        private ScrollQuerySite()
        {
        }
        
        public JSONObject scrollNext(String aHandle , int aMaxSize)
        {
            IScrollQuery<?> sq =  mResMap.remove(aHandle) ;
            Assert.notNull(sq, "查询句柄[%s]无效,可能已经过期,过期时间2分钟!" , aHandle) ;
            return sq.scrollNext(aMaxSize) ;
        }
        
        public void cacheScrollQuery(IScrollQuery<?> aScrollQuery)
        {
            if(aScrollQuery.getHandle() != null)
                mResMap.put(aScrollQuery.getHandle() , aScrollQuery) ;
        }
        
        
    }
    
    public interface IScrollQuery<T> extends Closeable
    {
        JSONObject scrollNext(int aMaxSize) ;
        
        String getHandle() ;
    }
    

    一种实现

    public class JScroll implements IScrollQuery<ResultSet>
    {
        Connection mConn ;
        PreparedStatement mPStm ;
        ResultSet mRs ;
        
        EFunction<JSONArray, Object, SQLException> mFac ;
        Comparator<Object> mComparator ;
    
        String mHandle ;
        int mMaxSize ;
        
        BiPredicate<ResultSet , JSONArray> mPred ;
        
        boolean mLookAhead = false ;
    
        
        public JScroll(Connection aConn , PreparedStatement aPstm , ResultSet aRs , int aMaxSize
                , BiPredicate<ResultSet , JSONArray> aPred
                , EFunction<JSONArray, Object, SQLException> aFac
                , Comparator<Object> aComparator)
        {
            mConn = aConn ;
            mPStm = aPstm ;
            mRs = aRs ;
            mMaxSize = aMaxSize<=0?500:aMaxSize ;
            mPred = aPred ;
            mFac = aFac ;
            mComparator = aComparator ;
        }
    
        @Override
        public void close()
        {
            StreamAssist.closeAll(mRs , mPStm , mConn) ; 
        }
    
        @Override
        public JSONObject scrollNext(int aMaxSize)
        {
            int count = 0 ;
            String handle = null ;
            if(aMaxSize<=0)
                aMaxSize = mMaxSize ;
            JSONArray ja = new JSONArray() ;
            try
            {
                while(mLookAhead || mRs.next())
                {
                    if(count++>=aMaxSize)
                    {
                        handle = UUID.randomUUID().toString() ;
                        mLookAhead = true ; 
                        break ;
                    }
                    mLookAhead = false ;
                    if(!mPred.test(mRs , ja))
                    {
                        mHandle = null ;
                        return null ;
                    }
                }
            }
            catch (Exception e)
            {
                WrapException.wrapThrow(e) ;
            }
            
            if(mComparator != null && ja.isNotEmpty())
                ja.sort(mComparator) ;
            
            Object data = ja ;
            JSONObject resultJo = null ;
            if(mFac != null)
            {
                try
                {
                    Object genObj = mFac.apply(ja) ;
                    if(!(genObj instanceof JSONObject))
                    {
                        data = genObj ;
                    }
                    else
                    {
                        resultJo = (JSONObject)genObj ;
                    }
                }
                catch (Exception e)
                {
                    WrapException.wrapThrow(e) ;
                }
            }
            
            mHandle = handle ;
            if(handle != null)
                ScrollQuerySite.getInstance().cacheScrollQuery(this) ;
            
            if(resultJo == null)
                resultJo = new JSONObject() ;
            
            return resultJo.put("data", data)
                    .put("returnAmount", ja.length())
                    .put("handle", mHandle)
                    .put("hasMore", mHandle != null) ;
        }
    
        @Override
        public String getHandle()
        {
            return mHandle ;
        }
    
    }
    

    相关文章

      网友评论

        本文标题:滚动查询

        本文链接:https://www.haomeiwen.com/subject/aqcmmrtx.html