美文网首页JavaWeb 知识点
flowable源码解读之LRU缓存设计

flowable源码解读之LRU缓存设计

作者: Acamy丶 | 来源:发表于2019-02-23 18:17 被阅读25次

    流程数据的定义在flowable中是比较复杂的, 涉及到多张数据库表关联关系,这些在一个流程引擎中也是最为核心的数据,并且需要进行频繁的读取,所以为了实现读取的高效性,flowable将流程定义的数据放在内存中,需要时直接从内存中获取。本文就简单看一看flowable是怎么实现缓存的。

    1. 用户定义的入口

    先回顾一下,如果我们要在Spring环境中使用flowable必须采用类似方式定义一个bean

    <bean id="processEngineConfiguration"
          class="org.flowable.spring.SpringProcessEngineConfiguration">
    

    SpringProcessEngineConfiguration继承于ProcessEngineConfigurationImpl,在ProcessEngineConfigurationImpl源码中可以看到:

        protected int processDefinitionCacheLimit = -1; // By default, no limit
        protected DeploymentCache<ProcessDefinitionCacheEntry> processDefinitionCache;
    
        protected int processDefinitionInfoCacheLimit = -1; // By default, no limit
        protected ProcessDefinitionInfoCache processDefinitionInfoCache;
    
        protected int knowledgeBaseCacheLimit = -1;
        protected DeploymentCache<Object> knowledgeBaseCache;
    
        protected int appResourceCacheLimit = -1;
        protected DeploymentCache<Object> appResourceCache;
    

    因此我们可以在bean定义中可以设置processDefinitionCacheLimit等属性的值即可控制缓存的容量,如果没有进行设置将不会对其限制,为以防止OOM异常,建议可以设置一个,当超出容量时flowable引擎将会通过LRU算法进行移除。

    2. 缓存的初始化

    flowable流程引擎在Spring环境中的启动源码分析这篇文章我们已经知道了flowble流程的初始化过程,那么对缓存的初始化肯定也能在相关内里找到,在ProcessEngineConfigurationImpl的init方法中有如下几行代码:

            initProcessDefinitionCache();
            initProcessDefinitionInfoCache();
            initAppResourceCache();
            initKnowledgeBaseCache();
    

    以initProcessDefinitionCache为例看一下方法实现,不用多说:

        public void initProcessDefinitionCache() {
            if (processDefinitionCache == null) {
                if (processDefinitionCacheLimit <= 0) {
                    processDefinitionCache = new DefaultDeploymentCache<ProcessDefinitionCacheEntry>();
                } else {
                    processDefinitionCache = new DefaultDeploymentCache<ProcessDefinitionCacheEntry>(processDefinitionCacheLimit);
                }
            }
        }
    

    进入DefaultDeploymentCache的构造方法,当没有设置缓存大小时通过无参构造方法创建的是一个同步的Map, 重点可以看一下下面的有参构造函数实现:

    /** Cache with no limit */
    public DefaultDeploymentCache() {
        this.cache = Collections.synchronizedMap(new HashMap<String, T>());
    }
    
    /**
     * Cache which has a hard limit: no more elements will be cached than the limit.
     */
    public DefaultDeploymentCache(final int limit) {
        this.cache = Collections.synchronizedMap(new LinkedHashMap<String, T>(limit + 1, 0.75f, true) { // +1 is needed, because the entry is inserted first, before it is removed
            // 0.75 is the default (see javadocs)
            // true will keep the 'access-order', which is needed to have a real LRU cache
            private static final long serialVersionUID = 1L;
    
            protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
                boolean removeEldest = size() > limit;
                if (removeEldest && logger.isTraceEnabled()) {
                    logger.trace("Cache limit is reached, {} will be evicted", eldest.getKey());
                }
                return removeEldest;
            }
    
        });
    }
    

    有参构造方法核心是基于LinkedHashMap并且重写了removeEldestEntry方法,当超出容量时会返回true, 查看LinkedHashMap可以知道当调用put或putAll返回前会根据该方法返回的值决定是否移除最老的一个元素,从而实现了LRU缓存算法。

    3. 缓存的写入与更新

    在流程部署时肯定会涉及到相关数据的更新,通过RepositoryServiceImpl.deploy->DeployCmd.executeDeploy查看有如下代码:

            // Actually deploy
            commandContext.getProcessEngineConfiguration().getDeploymentManager().deploy(deployment, deploymentSettings);
    

    查看DeploymentManager.deploy->BpmnDeployer.deploy代码:

            cachingAndArtifactsManager.updateCachingAndArtifacts(parsedDeployment);
    

    然后想看CachingAndArtifactsManager.updateCachingAndArtifacts方法源码即具体更新缓存的实现:

        /**
         * Ensures that the process definition is cached in the appropriate places, including the deployment's collection of deployed artifacts and the deployment manager's cache, as well as caching any
         * ProcessDefinitionInfos.
         */
        public void updateCachingAndArtifacts(ParsedDeployment parsedDeployment) {
            CommandContext commandContext = Context.getCommandContext();
            final ProcessEngineConfigurationImpl processEngineConfiguration = Context.getProcessEngineConfiguration();
            DeploymentCache<ProcessDefinitionCacheEntry> processDefinitionCache = processEngineConfiguration.getDeploymentManager().getProcessDefinitionCache();
            DeploymentEntity deployment = parsedDeployment.getDeployment();
    
            for (ProcessDefinitionEntity processDefinition : parsedDeployment.getAllProcessDefinitions()) {
                BpmnModel bpmnModel = parsedDeployment.getBpmnModelForProcessDefinition(processDefinition);
                Process process = parsedDeployment.getProcessModelForProcessDefinition(processDefinition);
                ProcessDefinitionCacheEntry cacheEntry = new ProcessDefinitionCacheEntry(processDefinition, bpmnModel, process);
                processDefinitionCache.add(processDefinition.getId(), cacheEntry);
                addDefinitionInfoToCache(processDefinition, processEngineConfiguration, commandContext);
    
                // Add to deployment for further usage
                deployment.addDeployedArtifact(processDefinition);
            }
        }
    

    4. 缓存的读取

    一个典型的读取场景就是在启动流程的时候,所以查看RuntimeServiceImpl.startProcessInstanceByKey->StartProcessInstanceCmd.execute方法源码:

    // Find the process definition
    ProcessDefinition processDefinition = null;
    if (processDefinitionId != null) {
    
        processDefinition = deploymentCache.findDeployedProcessDefinitionById(processDefinitionId);
        if (processDefinition == null) {
            throw new FlowableObjectNotFoundException("No process definition found for id = '" + processDefinitionId + "'", ProcessDefinition.class);
        }
    
    }
    

    接下来进入DeploymentManager.findDeployedProcessDefinitionById可以看到, 首先会从缓存中查找,如果没有则从数据库中加载:

    public ProcessDefinition findDeployedProcessDefinitionById(String processDefinitionId) {
        if (processDefinitionId == null) {
            throw new FlowableIllegalArgumentException("Invalid process definition id : null");
        }
    
        // first try the cache
        ProcessDefinitionCacheEntry cacheEntry = processDefinitionCache.get(processDefinitionId);
        ProcessDefinition processDefinition = cacheEntry != null ? cacheEntry.getProcessDefinition() : null;
    
        if (processDefinition == null) {
            processDefinition = processDefinitionEntityManager.findById(processDefinitionId);
            if (processDefinition == null) {
                throw new FlowableObjectNotFoundException("no deployed process definition found with id '" + processDefinitionId + "'", ProcessDefinition.class);
            }
            processDefinition = resolveProcessDefinition(processDefinition).getProcessDefinition();
        }
        return processDefinition;
    }
    

    然后看一下resolveProcessDefinition方法, 当缓存中没有数据时会调用deploy方法来重新加载缓存。

    /**
     * Resolving the process definition will fetch the BPMN 2.0, parse it and store the {@link BpmnModel} in memory.
     */
    public ProcessDefinitionCacheEntry resolveProcessDefinition(ProcessDefinition processDefinition) {
        String processDefinitionId = processDefinition.getId();
        String deploymentId = processDefinition.getDeploymentId();
    
        ProcessDefinitionCacheEntry cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
    
        if (cachedProcessDefinition == null) {
            if (Flowable5Util.isFlowable5ProcessDefinition(processDefinition, processEngineConfiguration)) {
                return Flowable5Util.getFlowable5CompatibilityHandler().resolveProcessDefinition(processDefinition);
            }
    
            DeploymentEntity deployment = deploymentEntityManager.findById(deploymentId);
            deployment.setNew(false);
            deploy(deployment, null);
            cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
    
            if (cachedProcessDefinition == null) {
                throw new FlowableException("deployment '" + deploymentId + "' didn't put process definition '" + processDefinitionId + "' in the cache");
            }
        }
        return cachedProcessDefinition;
    }
    

    总结一下:flowable缓存的实现核心即基于LinkedHashMap并通过重写其removeEldestEntry方法实现LRU缓存移除算法。以流程定义缓存为例可以知道,每次部署时会将流程定义的数据加入缓存,每次流程启动时都会尝试去缓存中获取数据,如果缓存中有就直接返回,如果没有就从数据库中加载并放入缓存以供下次使用。

    相关文章

      网友评论

        本文标题:flowable源码解读之LRU缓存设计

        本文链接:https://www.haomeiwen.com/subject/wvwpyqtx.html