美文网首页spring bootJAVA
SpringBoot集成Elasticsearch 实战(1)

SpringBoot集成Elasticsearch 实战(1)

作者: 孤山之王 | 来源:发表于2021-12-27 17:49 被阅读0次

    1. 目录

    20201230114440

    2. SpringBoot集成

    开发工具,这里选择的是IDEA 2021.1.2,构建 Gradle 工程等一堆通用操作,不清楚的自行百度 或者 参看 90分钟玩转Gradle

    2.1. 依赖配置

    我这边选择 spring-boot-starter-data-elasticsearch 方式来集成 spring-boot 中集成的版本号与实际安装版本号的差异,尽量选择一致的版本,否则在集成过程中,会有莫名的问题。读者在选择的时候多加留意。

    
    api("org.springframework.boot:spring-boot-starter-data-elasticsearch")
    
    

    我在此基础上封装一层 persistence-elasticsearch,更贴近一般项目使用。

    • 中央仓库下载
    中央仓库
    • 阿里云的仓库下载
    20211227173753

    2.2. 核心操作类

    为了规范索引管理,这里将所有的操作都封装成一个基类,实现对索引的增删改查。同时还集成了对数据的单个以及批量的插入以及删除。避免针对每个索引都自己写一套实现,杜绝代码的冗余,同时这样的集成对代码的结构本身也是低侵入性。

    • AbstractElasticIndexManger
    
    public abstract class AbstractElasticIndexManger {
    
        protected ElasticsearchRestTemplate elasticsearchRestTemplate;
    
        protected RestHighLevelClient restHighLevelClient;
    
        @Autowired
        public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
            this.restHighLevelClient = restHighLevelClient;
        }
    
        @Autowired
        public void setElasticsearchRestTemplate(ElasticsearchRestTemplate elasticsearchRestTemplate) {
            this.elasticsearchRestTemplate = elasticsearchRestTemplate;
        }
    
        /**
         * 设置分片 和 副本
         * 副本作用主要为了保证数据安全
         *
         * @param request 请求
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 19:27
         */
        protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
            request.settings(Settings.builder().put("index.number_of_shards", shards)
                    .put("index.number_of_replicas", replicas));
        }
    
        /**
         * 查询匹配条件的数据量,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
         *
         * @param builder    BoolQueryBuilder类型查询实例
         * @param indexNames 索引名,可以一次性查询多个
         * @return long 最终数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-9:26
         **/
        protected long count(BoolQueryBuilder builder, String... indexNames) {
            NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
            nativeSearchQueryBuilder.withQuery(builder);
            return elasticsearchRestTemplate.count(nativeSearchQueryBuilder.build(), IndexCoordinates.of(indexNames));
        }
    
        /**
         * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
         *
         * @param builder    BoolQueryBuilder类型查询实例
         * @param clazz      Class对象
         * @param indexNames 索引名,可以一次性查询多个
         * @return long 最终数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-9:26
         **/
        protected SearchHits search(BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
            NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
            nativeSearchQueryBuilder.withQuery(builder);
            Pageable pageable = PageRequest.of(1, 20);
            nativeSearchQueryBuilder.withPageable(pageable);
            return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
        }
    
        /**
         * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
         *
         * @param page       当前页
         * @param size       每页大小
         * @param builder    BoolQueryBuilder类型查询实例
         * @param clazz      Class对象
         * @param indexNames 索引名,可以一次性查询多个
         * @return SearchHits 命中结果的数据集
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-9:26
         **/
        protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolQueryBuilder builder, Class<? extends BasePo> clazz, String... indexNames) {
            NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
            nativeSearchQueryBuilder.withQuery(builder);
            Pageable pageable = PageRequest.of(page, size);
            nativeSearchQueryBuilder.withPageable(pageable);
            return elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), clazz, IndexCoordinates.of(indexNames));
        }
    
        protected DeleteByQueryRequest builderDeleteRequest(QueryBuilder builder, String... indexNames) {
            DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames);
            request.setQuery(builder);
            request.setBatchSize(0X5F5E0FF);
            request.setConflicts("proceed");
            return request;
        }
    
        /**
         * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
         *
         * @param params     Map形式的 字段名 和 字段内容 组成的条件
         * @param builder    BoolQueryBuilder类型查询实例
         * @param indexNames 索引名,可以一次性查询多个
         * @return long 最终数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-9:26
         **/
        protected BulkByScrollResponse update(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
            UpdateByQueryRequest request = buildUpdateByQueryReq(params, builder, indexNames);
            try {
                return restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 构建更新 QueryRequest
         *
         * @param params     参数
         * @param builder    布尔构建
         * @param indexNames 索引
         * @return UpdateByQueryRequest
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-15:50
         **/
        protected UpdateByQueryRequest buildUpdateByQueryReq(Map<String, Object> params, BoolQueryBuilder builder, String... indexNames) {
            Script script = buildScriptType(params);
            UpdateByQueryRequest request = new UpdateByQueryRequest(indexNames);
            request.setQuery(builder);
            request.setScript(script);
            request.setConflicts("proceed");
            request.setRefresh(true);
            request.setTimeout(TimeValue.timeValueMinutes(3));
            return request;
        }
    
        /**
         * 以 K-V键值对 方式构建条件 Script
         *
         * @param params Map形式的 字段名 和 字段内容 组成的条件
         * @return Script
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-13:19
         **/
        protected Script buildScriptType(Map<String, Object> params) {
            Set<String> keys = params.keySet();
            StringBuffer idOrCodeStb = new StringBuffer();
            for (String key : keys) {
                idOrCodeStb.append("ctx._source.").append(key).append("=params.").append(key).append(";");
            }
            ScriptType type = ScriptType.INLINE;
            return new Script(type, Script.DEFAULT_SCRIPT_LANG, idOrCodeStb.toString(), params);
        }
    
        /**
         * @param builder BoolQueryBuilder
         * @param bool    布尔类条件
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-14:45
         **/
        protected void setBuilders(BoolQueryBuilder builder, BoolCondition bool) {
            mustBuilders(builder, bool);
            mustNotBuilders(builder, bool);
            shouldBuilders(builder, bool);
            filterBuilders(builder, bool);
        }
    
        /**
         * 构建满足 必须 条件 的方法
         *
         * @param builder BoolQueryBuilder
         * @param bool    布尔类条件
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-14:45
         **/
        protected void mustBuilders(BoolQueryBuilder builder, BoolCondition bool) {
            List<AtomicCondition> must = bool.getMust();
            if (must.isEmpty()) {
                return;
            }
            for (AtomicCondition cds : must) {
                builder.must(getQueryBuilder(cds));
            }
        }
    
        /**
         * 构建满足 非必须 条件 的方法
         *
         * @param builder BoolQueryBuilder
         * @param bool    布尔类条件
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-14:45
         **/
        protected void mustNotBuilders(BoolQueryBuilder builder, BoolCondition bool) {
            List<AtomicCondition> mustNot = bool.getMustNot();
            if (mustNot.isEmpty()) {
                return;
            }
            for (AtomicCondition cds : mustNot) {
                builder.mustNot(getQueryBuilder(cds));
            }
        }
    
        /**
         * 构建满足 可选 条件 的方法
         *
         * @param builder BoolQueryBuilder
         * @param bool    布尔类条件
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-14:45
         **/
        protected void shouldBuilders(BoolQueryBuilder builder, BoolCondition bool) {
            List<AtomicCondition> should = bool.getShould();
            if (should.isEmpty()) {
                return;
            }
            for (AtomicCondition cds : should) {
                builder.should(getQueryBuilder(cds));
            }
        }
    
        /**
         * 构建满足 必须 条件 的方法,推荐使用
         *
         * @param builder BoolQueryBuilder
         * @param bool    布尔类条件
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/28-14:45
         **/
        protected void filterBuilders(BoolQueryBuilder builder, BoolCondition bool) {
            List<AtomicCondition> filter = bool.getFilter();
            if (filter.isEmpty()) {
                return;
            }
            for (AtomicCondition cds : filter) {
                builder.filter(getQueryBuilder(cds));
            }
        }
    
        public QueryBuilder getQueryBuilder(AtomicCondition cds) {
            QueryBuilder queryBuilder;
            Tuple tuple = cds.getTuple();
            switch (cds.getStatus()) {
                case (Constants.SUFFIX_QUERY):
                    queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.MULTI_CHARACTER + tuple.v1());
                    break;
                case (Constants.SUFFIX_SINGLE_QUERY):
                    queryBuilder = QueryBuilders.wildcardQuery(cds.getField(), Constants.SINGLE_CHARACTER + tuple.v1());
                    break;
                case (Constants.RANGE_QUERY):
                    queryBuilder = QueryBuilders.rangeQuery(cds.getField()).from(tuple.v1()).to(tuple.v2());
                    break;
                case (Constants.PREFIX_QUERY):
                    queryBuilder = QueryBuilders.prefixQuery(cds.getField(), tuple.v1().toString());
                    break;
                case (Constants.REG_QUERY):
                    queryBuilder = QueryBuilders.regexpQuery(cds.getField(), tuple.v1().toString());
                    break;
                default:
                    queryBuilder = QueryBuilders.termQuery(cds.getField(), tuple.v1().toString());
                    break;
            }
            return queryBuilder;
        }
    }
    
    
    
    • ElasticIndexManger
    
    public class ElasticIndexManger extends AbstractElasticIndexManger {
    
    
        /**
         * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
         *
         * @param indexName 索引名称
         * @param mapping   索引定义,JSON形式的字符串
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:30
         */
        public void createIndex(String indexName, String mapping) {
            createIndex(indexName, mapping, 0, 1);
        }
    
    
        /**
         * 指定索引结构创建索引
         *
         * @param indexName 索引名称
         * @param mapping   索引定义,JSON形式的字符串
         * @param replicas  副本的数量
         * @param shards    分片数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:30
         */
        public void createIndex(String indexName, String mapping, int replicas, int shards) {
            try {
                if (!this.existIndex(indexName)) {
                    log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                    return;
                }
                CreateIndexRequest request = new CreateIndexRequest(indexName);
                buildSetting(request, replicas, shards);
                request.mapping(mapping, XContentType.JSON);
                CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
                if (!res.isAcknowledged()) {
                    throw new RuntimeException("初始化失败");
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(0);
            }
        }
    
        /**
         * 获取所有索引,默认为所有索引
         *
         * @return List
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/10/30-11:54
         **/
        public List getAllIndex() {
            return getAllIndex(Constants.MULTI_CHARACTER);
        }
    
        /**
         * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
         *
         * @param inPattern 正则表达式
         * @return List
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/10/30-11:54
         **/
        public List<String> getAllIndex(String inPattern) {
            GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
            try {
                GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
                String[] indices = getIndexResponse.getIndices();
                return Arrays.asList(indices);
            } catch (IOException e) {
                log.error("获取索引失败 {} 已经存在", e.getMessage());
            } catch (ElasticsearchStatusException e) {
                log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
            }
            return Collections.EMPTY_LIST;
        }
    
        /**
         * 制定配置项的判断索引是否存在,注意与 isExistsIndex 区别
         * <ul>
         *     <li>1、可以指定 用本地检索 还是用 主动节点方式检索</li>
         *     <li>2、是否适应被人读的方式</li>
         *     <li>3、返回默认设置</li>
         * </ul>
         *
         * @param indexName index名
         * @return boolean
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:27
         */
        public boolean existIndex(String indexName) throws IOException {
            GetIndexRequest request = new GetIndexRequest(indexName);
            //TRUE-返回本地信息检索状态,FALSE-还是从主节点检索状态
            request.local(false);
            //是否适应被人可读的格式返回
            request.humanReadable(true);
            //是否为每个索引返回所有默认设置
            request.includeDefaults(false);
            //控制如何解决不可用的索引以及如何扩展通配符表达式,忽略不可用索引的索引选项,仅将通配符扩展为开放索引,并且不允许从通配符表达式解析任何索引
            request.indicesOptions(IndicesOptions.lenientExpandOpen());
            return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        }
    
        /**
         * 单纯断某个索引是否存在
         *
         * @param indexName index名
         * @return boolean 存在为True,不存在则 False
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:27
         */
        public boolean isIndexExists(String indexName) throws Exception {
            return restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
        }
    
        /**
         * 批量插入数据,通过 {@link List} 的对象集合进行插入,此处对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
         *
         * @param indexName index
         * @param list      列表
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:26
         */
        public void batch(String indexName, List<? extends BasePo> list) throws IOException {
            int sleep = 15;
            BulkRequest request = new BulkRequest();
            list.forEach(item -> request.add(new IndexRequest(indexName)
                    .id(item.getId().toString())
                    .source(JSON.toJSONString(item), XContentType.JSON)));
            try {
                BulkResponse bulkResponse = bulk(request);
                log.error("[Verification BulkResponse bulk 操作结果] {}, 文件大小 {} ", bulkResponse.status(), list.size());
                if (bulkResponse.hasFailures()) {
                    log.error(bulkResponse.buildFailureMessage());
                    for (BulkItemResponse bulkItemResponse : bulkResponse) {
                        if (bulkItemResponse.isFailed()) {
                            log.error(bulkItemResponse.getFailureMessage());
                        }
                    }
                    log.error("批量操作失败,重新再提交一次,间隔时间{}, 文件大小 {} ", sleep, list.size());
                    TimeUnit.SECONDS.sleep(sleep);
                    bulkResponse = bulk(request);
                    if (bulkResponse.hasFailures()) {
                        log.error("再次提交失败,需要写入MQ , 文件大小 {} ", list.size());
                    }
                }
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * bulk 方式批量提交
         *
         * @param request {@link BulkRequest} 请求
         * @return BulkResponse
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/10/24-15:50
         **/
        private BulkResponse bulk(BulkRequest request) throws IOException {
            return restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        }
    
        /**
         * <p>
         * 批量插入数据,通过 {@link List} 的对象集合进行插入,提交前,判断 该索引是否存在不存在则直接创建 该索引
         * 并对失败的提交进行二次提交,并覆盖原有数据,这一层面是 ElasticSearch自行控制
         * </p>
         *
         * @param indexName index
         * @param list      列表
         * @param created   当索引不存在,则创建索引,默认为 true,即索引不存在,创建该索引,此时 mapping 应该不为空
         * @param mapping   索引定义,JSON形式的字符串
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:26
         */
        public void batch(List<? extends BasePo> list, String indexName, boolean created, String mapping) throws Exception {
            try {
                if (!isIndexExists(indexName)) {
                    log.error("[Index does not exist] Rebuilding index. IndexName ={}", indexName);
                    if (created && StringUtils.isNotBlank(mapping)) {
                        createIndex(indexName, mapping);
                    } else {
                        log.error("[Index does not exist , No index creation] IndexName ={}", indexName);
                        return;
                    }
                }
                batch(indexName, list);
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 批量删除,根据索引名称,删除索引下数据
         *
         * @param indexName index
         * @param idList    待删除列表
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:14
         */
        public <T> void deleteBatch(String indexName, Collection<T> idList) {
            BulkRequest request = new BulkRequest();
            idList.forEach(item -> request.add(new DeleteRequest(indexName, item.toString())));
            try {
                restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 根据索引名称,和 {@link SearchSourceBuilder} 条件,以及返回对象实体类,返回列表
         *
         * @param indexName index
         * @param builder   查询参数
         * @param clazz     结果类对象
         * @return java.util.List<T>
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:14
         */
        public <T> List<T> search(String indexName, SearchSourceBuilder builder, Class<T> clazz) {
            List res = Collections.EMPTY_LIST;
            try {
                SearchRequest request = new SearchRequest(indexName);
                request.source(builder);
                SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
                SearchHit[] hits = response.getHits().getHits();
                res = new ArrayList<>(hits.length);
                for (SearchHit hit : hits) {
                    res.add(JSON.parseObject(hit.getSourceAsString(), clazz));
                }
            } catch (IOException e) {
                log.error("[ElasticSearch] connect err ,err-msg {}", e.getMessage());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            return res;
        }
    
        /**
         * 删除 index,以及索引下数据
         *
         * @param indexName 索引名字
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:13
         */
        public void deleteIndex(String indexName) {
            try {
                restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 删除索引下数据,但是不删除索引结构
         *
         * @param builder    条件构建模式
         * @param indexNames 索引名称列表
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:13
         */
        public void deleteByQuery(QueryBuilder builder, String... indexNames) {
            try {
                DeleteByQueryRequest request = builderDeleteRequest(builder, indexNames);
                BulkByScrollResponse response = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 不推荐使用,原因为不够灵活,获取该索引下可以匹配的数量,支持 模糊查询和精确查询,
         * 用法 在 方法 <b>field</b> 的处理上。
         * <ul>
         *     <li>模糊匹配模式:字段</li>
         *     <li>精确匹配模式:字段.类型</li>
         * </ul>
         *
         * @param indexName 文档索引名
         * @param field     字段
         * @param text      内容
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2018/07/20-20:47
         **/
        @Deprecated
        public long countMatchPhrasePrefixQuery(String indexName, String field, String text) {
            CountRequest countRequest = new CountRequest(indexName);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(matchPhrasePrefixQuery(field, text));
            countRequest.source(searchSourceBuilder);
            CountResponse countResponse = null;
            try {
                countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return countResponse == null ? 0L : countResponse.getCount();
        }
    
    
        /**
         * 按照字段 内容进行精确匹配,返回匹配的数量
         *
         * @param field      字段名
         * @param content    内容
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:49
         **/
        public long exactCondition(String field, String content, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.must(QueryBuilders.termQuery(field, content));
            return count(builder, indexNames);
        }
    
    
        /**
         * 按照字段的前缀内容进行匹配,返回匹配的数量
         *
         * @param field      字段名
         * @param prefix     前缀
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:49
         **/
        public long prefix(String field, String prefix, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.must(QueryBuilders.prefixQuery(field, prefix));
            return count(builder, indexNames);
        }
    
    
        /**
         * 按照字段对 内容进行后缀匹配,返回匹配的数量
         *
         * @param suffix     后缀
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:56
         **/
        public long suffix(String field, String suffix, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
            return count(builder, indexNames);
        }
    
    
        /**
         * 字段的前缀和后缀都必须满足条件
         *
         * @param field      字段
         * @param prefix     前缀
         * @param suffix     后缀
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:59
         **/
        public long prefixAndSuffix(String field, String prefix, String suffix, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.must(QueryBuilders.prefixQuery(field, prefix));
            builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
            return count(builder, indexNames);
        }
    
        /**
         * 字段的前缀和后缀都满足一个条件按即可
         *
         * @param field      字段
         * @param prefix     前缀
         * @param suffix     后缀
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:59
         **/
        public long prefixOrSuffix(String field, String prefix, String suffix, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.should(QueryBuilders.prefixQuery(field, prefix));
            builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
            return count(builder, indexNames);
        }
    
        /**
         * 字段的前缀必须满足,而 后缀则不要求 不一定满足
         *
         * @param field      字段
         * @param prefix     前缀
         * @param suffix     后缀
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:59
         **/
        public long prefixMustSuffixShould(String field, String prefix, String suffix, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.must(QueryBuilders.prefixQuery(field, prefix));
            builder.should(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
            return count(builder, indexNames);
        }
    
        /**
         * 字段的前缀选择性满足,而 后缀则一定要满足
         *
         * @param field      字段
         * @param prefix     前缀
         * @param suffix     后缀
         * @param indexNames 索引名
         * @return long 数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-10:59
         **/
        public long prefixShouldSuffixMust(String field, String prefix, String suffix, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            builder.should(QueryBuilders.prefixQuery(field, prefix));
            builder.must(QueryBuilders.wildcardQuery(field, Constants.MULTI_CHARACTER + suffix));
            return count(builder, indexNames);
        }
    
    
        /**
         * 查询总数
         *
         * @param indexNames 索引文档名称,可以是多个
         * @return long 匹配的数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/10/29-21:11
         **/
        public long total(String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            return count(builder, indexNames);
        }
    
        /**
         * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
         *
         * @param params     Map形式的 字段名 和 字段内容 组成的条件
         * @param bool      复合条件封装
         * @param indexNames 索引名,可以一次性查询多个
         * @return long 最终数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-9:26
         **/
        public BulkByScrollResponse update(Map<String, Object> params, BoolCondition bool, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            setBuilders(builder,bool);
            return update(params,builder,indexNames);
        }
    
        /**
         * 查询匹配条件,支持同时对多个索引进行查询,只要将索引名称按照 字符数组形式组成即可
         *
         * @param page    当前页
         * @param size    每页大小
         * @param clazz      Class对象
         * @param indexNames 索引名,可以一次性查询多个
         * @return SearchHits 命中结果的数据集
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/11/1-9:26
         **/
        protected SearchHits<? extends BasePo> searchPage(int page, int size, BoolCondition bool,Class<? extends BasePo> clazz, String... indexNames) {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            setBuilders(builder,bool);
            return searchPage(page,size,builder, clazz, indexNames);
        }
    
    }
    
    

    3. 项目代码

    通过以上的集成,我们看到完成在项目中对 elasticsearch 的集成,同时也用基类,将所有可能的操作都封装起来。下来我们通过对基类的讲解,来逐个说明!

    3.1. 索引管理

    由于在ElasticIndexManger类定义了所有方法,直接调用即可。

    3.1.1. 创建索引

    我们在创建索引过程中需要先判断是否有这个索引,否则不允许创建,由于我案例采用的是手动指定 indexNameSettings ,大家看的过程中要特别注意下,而且还有一点 indexName 必须是小写,如果是大写在创建过程中会有错误

    官方索引创建说明 索引名大写

    。详细的代码实现见如下:

    
     /**
         * 创建索引,默认分片数量为 1,即一个主片,副本数量为 0
         *
         * @param indexName 索引名称
         * @param mapping   索引定义,JSON形式的字符串
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:30
         */
        public void createIndex(String indexName, String mapping) {
            createIndex(indexName, mapping, 0, 1);
        }
    
    
        /**
         * 指定索引结构创建索引
         *
         * @param indexName 索引名称
         * @param mapping   索引定义,JSON形式的字符串
         * @param replicas  副本的数量
         * @param shards    分片数量
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:30
         */
        public void createIndex(String indexName, String mapping, int replicas, int shards) {
            try {
                if (!this.existIndex(indexName)) {
                    log.error(" indexName={} 已经存在,mapping={}", indexName, mapping);
                    return;
                }
                CreateIndexRequest request = new CreateIndexRequest(indexName);
                buildSetting(request, replicas, shards);
                request.mapping(mapping, XContentType.JSON);
                CreateIndexResponse res = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
                if (!res.isAcknowledged()) {
                    throw new RuntimeException("初始化失败");
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(0);
            }
        }
    
    

    创建索引需要设置分片,这里采用Settings.Builder方式,当然也可以 JSON 自定义方式,本文篇幅有限,不做演示。

    index.number_of_shards:分片数

    number_of_replicas:副本数

        /**
         * 设置分片 和 副本
         * 副本作用主要为了保证数据安全
         *
         * @param request 请求
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 19:27
         */
        protected void buildSetting(CreateIndexRequest request, int replicas, int shards) {
            request.settings(Settings.builder().put("index.number_of_shards", shards)
                    .put("index.number_of_replicas", replicas));
        }
    
    
    
    [elastic@localhost elastic]$ curl -H "Content-Type: application/json" -X GET "http://localhost:9200/_cat/indices?v"
    
    health status index        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    
    yellow open   twitter      scSSD1SfRCio4F77Hh8aqQ   3   2          2            0      8.3kb          8.3kb
    
    yellow open   idx_location _BJ_pOv0SkS4tv-EC3xDig   3   2          1            0        4kb            4kb
    
    yellow open   wongs        uT13XiyjSW-VOS3GCqao8w   3   2          1            0      3.4kb          3.4kb
    
    yellow open   idx_locat    Kr3wGU7JT_OUrRJkyFSGDw   3   2          3            0     13.2kb         13.2kb
    
    yellow open   idx_copy_to  HouC9s6LSjiwrJtDicgY3Q   3   2          1            0        4kb            4kb
      
    
    

    说明创建成功,这里总是通过命令行来验证,有点繁琐,既然我都有WEB服务,为什么不直接通过HTTP验证了?

    3.1.2. 查看索引

    查看索引这个操作支持模糊操作,即以通配符 * 作为一个或者多个字符匹配,这个操作在实际应用非常好用,将来有机会说到 Index 设计过程中就显得尤为重要。

    
        /**
         * 获取所有索引,默认为所有索引
         *
         * @return List
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/10/30-11:54
         **/
        public List getAllIndex() {
            return getAllIndex(Constants.MULTI_CHARACTER);
        }
    
        /**
         * 获取所有索引,按照正则表达式方式过滤 索引名称,并返回符合条件的索引名字
         *
         * @param inPattern 正则表达式
         * @return List
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2021/10/30-11:54
         **/
        public List<String> getAllIndex(String inPattern) {
            GetIndexRequest getIndexRequest = new GetIndexRequest(inPattern);
            try {
                GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
                String[] indices = getIndexResponse.getIndices();
                return Arrays.asList(indices);
            } catch (IOException e) {
                log.error("获取索引失败 {} 已经存在", e.getMessage());
            } catch (ElasticsearchStatusException e) {
                log.error("获取索引失败 {} 索引本身不存在", e.getMessage());
            }
            return Collections.EMPTY_LIST;
        }
    
    

    3.1.3. 删除索引

    删除的逻辑就比较简单,这里就不多说。

    
        /**
         * 删除 index,以及索引下数据
         *
         * @param indexName 索引名字
         * @author <a href="https://github.com/rothschil">Sam</a>
         * @date 2019/10/17 17:13
         */
        public void deleteIndex(String indexName) {
            try {
                restHighLevelClient.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    

    3.2. 引用依赖

    构建一个工程,我这里依然用 Gralde 工程作为样例说明, Maven 项目类似。

    3.2.1. 依赖管理

    
    implementation("io.github.rothschil:persistence-elasticsearch:1.2.3.RELEASE")
    
    

    3.2.2. 依赖注入

    在工程项目中直接使用 ElasticIndexManger 作为实例注入进来,后面我们可以直接使用它提供的各种方法。样例中我是定义一个精确查询作为说明,TermQueryBuilder("sysCode","crm") 中参数分别代表匹配条件的列名和列的值; 在索引列名中我这里用的是 通配符,即可以在多个索引之间查询; AccLog.class 这是我自定义的类,用以接收查询出来的结果进行实例化映射。

    
    @Component
    public class LogIndexManager{
    
        private ElasticIndexManger elasticIndexManger;
    
        @Autowired
        public void setElasticIndexManger(ElasticIndexManger elasticIndexManger) {
            this.elasticIndexManger = elasticIndexManger;
        }
    
        public List<AccLog> query(){
            QueryBuilder queryBuilder = new TermQueryBuilder("sysCode","crm");
            SearchSourceBuilder sb = new SearchSourceBuilder();
            sb.query(queryBuilder);
            return elasticIndexManger.search("hnqymh_hpg*",sb,AccLog.class);
        }
    }
    
    

    ![查询结果]](https://img.haomeiwen.com/i7232803/c3509fce823f59ea.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    4. 源码

    Github演示源码 ,记得给Star

    Gitee演示源码,记得给Star

    相关文章

      网友评论

        本文标题:SpringBoot集成Elasticsearch 实战(1)

        本文链接:https://www.haomeiwen.com/subject/jrwyqrtx.html