spring data elasticsearch 使用及源码分

作者: freelands | 来源:发表于2017-12-13 19:38 被阅读161次
    spring integrate es.png
    最近一直在用elasticsearch做一些数据的存储和查询 ,开始为了方便就直接使用了spring data elasticsearch ,期间也经历了很多坑,比如findByName(String name)这种接口默认只返回10条数据,自己也对源码做个跟踪,也研究了一下为什么elasticsearch要这样做。接下来我们通过分析spring data es的两个接口,来看一下是如果实现对es进行操作的。
    1.findByName接口分析
    • spring data es repository 接口
    public interface UserRepository extends ElasticsearchRepository<User,String> {
        /**
         * 默认只返回10条
         */
        List<User> findByName(String name);
    
    
        List<User> deleteByName(String name);
    }
    
    
    • spring 启动分析构造UserRepository代理接口
      启动的时候ElasticsearchRepositoryFactoryBean的方法会进行对应代理类的生成,并注入到spring容器
        @Override
        public void afterPropertiesSet() {
            super.afterPropertiesSet();
            Assert.notNull(operations, "ElasticsearchOperations must be configured!");
        }
    

    这里的super指是RepositoryFactoryBeanSupport,spring data jpa启动整体代理接口注入到spring容器中也是通过这个类做的,其中spring data jpa 对应的类是JpaRepositoryFactoryBean

    #其中RepositoryFactoryBeanSupport类中的下面这段代码比较核心
    public void afterPropertiesSet() {
            #getRepositoryMetadata是获取这个接口的元数据,就是这个接口的一些基本的属
            this.repositoryMetadata = this.factory.getRepositoryMetadata(repositoryInterface);
            #这个方法是生成这个接口的实现类
            this.repository = Lazy.of(() -> this.factory.getRepository(repositoryInterface, repositoryFragmentsToUse));
            #如果不是懒加载立即初始化
            if (!lazyInit) {
                this.repository.get();
            }
        }
    

    接着我们分析上面的核心代码getRepository这个方法

    public <T> T getRepository(Class<T> repositoryInterface, RepositoryFragments fragments) {
    
            Assert.notNull(repositoryInterface, "Repository interface must not be null!");
            Assert.notNull(fragments, "RepositoryFragments must not be null!");
    
            RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface);
            RepositoryComposition composition = getRepositoryComposition(metadata, fragments);
            RepositoryInformation information = getRepositoryInformation(metadata, composition);
    
            validate(information, composition);
    
            //这里是构造一个对象,然后填充一些这个接口对应的entity信息,比如索引名,类型名等
            Object target = getTargetRepository(information);
    
            // Create proxy 接下来通过代理工厂创建代理
            ProxyFactory result = new ProxyFactory();
            result.setTarget(target);
            result.setInterfaces(repositoryInterface, Repository.class, TransactionalProxy.class);
    
            result.addAdvice(SurroundingTransactionDetectorMethodInterceptor.INSTANCE);
            result.addAdvisor(ExposeInvocationInterceptor.ADVISOR);
    
            postProcessors.forEach(processor -> processor.postProcess(result, information));
        
            result.addAdvice(new DefaultMethodInvokingMethodInterceptor());
            //这个DefaultMethodInvokingMethodInterceptor
            //会为所有我们在repository接口中自定义的方法加上切面
            result.addAdvice(new QueryExecutorMethodInterceptor(information));
    
            composition = composition.append(RepositoryFragment.implemented(target));
            result.addAdvice(new ImplementationMethodExecutionInterceptor(composition));
            //生产代理类返回注入到spring容器
            return (T) result.getProxy(classLoader);
        }
    

    接下来我们看看QueryExecutorMethodInterceptor类,主要是resolveQuery这个方法

    this.queries = lookupStrategy.map(it -> {
    
                    SpelAwareProxyProjectionFactory factory = new SpelAwareProxyProjectionFactory();
                    factory.setBeanClassLoader(classLoader);
                    factory.setBeanFactory(beanFactory);
    
                    return repositoryInformation.getQueryMethods().stream()//
                            .map(method -> Pair.of(method, it.resolveQuery(method, repositoryInformation, factory, namedQueries)))//
                            .peek(pair -> invokeListeners(pair.getSecond()))//
                            .collect(Pair.toMap());
    
                }).orElse(Collections.emptyMap());
    

    resolveQuery 中会为每一个方法创建一个ElasticsearchPartQuery,其中ElasticsearchPartQuery是自定义操作实现的核心类,我们看一下代码

        //构建类的时候会创建一个PartTree主要用来描述这个方法是什么方法,有没有分页等到
        private final PartTree tree;
        private final MappingContext<?, ElasticsearchPersistentProperty> mappingContext;
    
        public ElasticsearchPartQuery(ElasticsearchQueryMethod method, ElasticsearchOperations elasticsearchOperations) {
            super(method, elasticsearchOperations);
            this.tree = new PartTree(method.getName(), method.getEntityInformation().getJavaType());
            this.mappingContext = elasticsearchOperations.getElasticsearchConverter().getMappingContext();
        }
    
        //这个是在方法运行时调用时通过切面到这里然后转换成elasticsearch的查询接口
        @Override
        public Object execute(Object[] parameters) {
            ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters);
            CriteriaQuery query = createQuery(accessor);
            if(tree.isDelete()) {
                Object result = countOrGetDocumentsForDelete(query, accessor);
                elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType());
                return result;
            } else if (queryMethod.isPageQuery()) {
                query.setPageable(accessor.getPageable());
                return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
            } else if (queryMethod.isStreamQuery()) {
                Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
                if (query.getPageable().isUnpaged()) {
                    int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
                    query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
                }
    
                return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
    
            } else if (queryMethod.isCollectionQuery()) {
                if (accessor.getPageable() == null) {
                    int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
                    query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
                } else {
                    query.setPageable(accessor.getPageable());
                }
                return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType());
            } else if (tree.isCountProjection()) {
                return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
            }
            return elasticsearchOperations.queryForObject(query, queryMethod.getEntityInformation().getJavaType());
        }
    
        
    

    可以看到上面那个execute方法中主要有个ElasticsearchOperations类,负责调用es的方法,因为我们的List<User> findByName(String name)接口返回的是一个List,如果我们没有设置分页接口,这里会填充Unpaged.INSTANCE也就是表示客户端没有设置分页,es服务端会有默认填充。

    image.png
    然后逻辑走到了elasticsearchOperations.queryForList这个方法,这个方法调用的是queryForPage方法。这样整个流程我们就走通了,我们也看到了为什么我们写的es repository接口默认返回的是10条数据,所以如果想用这个接口去做大批量数据查询的话是会出现问题的,需要自己实现原生的接口,用scroll方式去拉取数据。其中deleteByName方法用按同样方式分析。
    2.userRepository.save(User user)接口分析
    public <S extends T> S save(S entity) {
            Assert.notNull(entity, "Cannot save 'null' entity.");
            elasticsearchOperations.index(createIndexQuery(entity));
            elasticsearchOperations.refresh(entityInformation.getIndexName());
            return entity;
        }
    
        /**
         * Index an object. Will do save or update
         *
         * @param query
         * @return returns the document id
         */
        String index(IndexQuery query);
    
        @Override
        public String index(IndexQuery query) {
            String documentId = prepareIndex(query).execute().actionGet().getId();
            // We should call this because we are not going through a mapper.
            if (query.getObject() != null) {
                setPersistentEntityId(query.getObject(), documentId);
            }
            return documentId;
        }
    
        /**
         * refresh the index
         *
         * @param indexName
         *
         */
        void refresh(String indexName);
    
        @Override
        public void refresh(String indexName) {
            Assert.notNull(indexName, "No index defined for refresh()");
            client.admin().indices().refresh(refreshRequest(indexName)).actionGet();
        }
    

    可以看到在save entity的时候做了两件事情,首先是index,也就是这条数据会存到es,但是此时我们查询是不能查到的,因为查询到es数据需要建立对应的倒排索引,index只是把数据放到es中的buffer中,还没有建立倒排索引所以index后是不能立即搜索到es中对应的数据的,然后又调用了refresh方法,这个方法会触发es去清空buffer 并写入文件系统缓存,也就是会为我们的索引文件建立倒排索引,此时就可以搜索到了。所以save这个接口性能是很差的,比起mysql单挑插入,我这边测试了,性能下降一半,大概是300ms,服务器上面可能性能好一点,速度快一点;es插入最好调用批量的bulk接口

    public <S extends T> List<S> save(List<S> entities) {
            Assert.notNull(entities, "Cannot insert 'null' as a List.");
            Assert.notEmpty(entities, "Cannot insert empty List.");
            List<IndexQuery> queries = new ArrayList<>();
            for (S s : entities) {
                queries.add(createIndexQuery(s));
            }
            elasticsearchOperations.bulkIndex(queries);
            elasticsearchOperations.refresh(entityInformation.getIndexName());
            return entities;
        }`
    

    可以看到这里批量插入最后只调1次refresh,refresh操作会清空es的buffer,并且写入到文件系统缓存,这个操作开销还是比较大的。

    相关文章

      网友评论

        本文标题:spring data elasticsearch 使用及源码分

        本文链接:https://www.haomeiwen.com/subject/gskzixtx.html