美文网首页程序员
JAVA客户端兼容Elasticsearch 6.8与7.8服务

JAVA客户端兼容Elasticsearch 6.8与7.8服务

作者: 简单是美美 | 来源:发表于2020-11-02 08:46 被阅读0次

    1. 问题背景

      目前项目中所使用的Elasticsearch(一下简称ES)的服务器版本是6.8.1,但将来有升级到7.8.1版本的计划。这样对于相同的应用软件版本而言,可能会在有的地方搭配6.8.1版本使用,有的地方搭配7.8.1版本使用。
      对于应用软件而言,作为ES服务器的JAVA客户端,需要使用一套代码来兼容6.8.1版本和7.8.1版本的ES服务器。

    2. 设计思路

      Java High Level REST Client(以下简称High Level)是ES官方推荐的JAVA客户端,具有即时同步ES服务器版本,功能特性齐全等特点,因此项目选用High Level作为视信通应用服务的ES客户端。
      根据High Level的官方文档,对于兼容性有如下说明:


    图1

      由官方文档可以看到,High Level具有向前兼容的特性,即老的High Level客户端可以被新版本的服务器兼容。
      根据这一特性,应用软件决定使用6.8.1版本的6.8.1客户端来兼容6.8.1和7.8.1的服务器。
      分析ES服务器6和7版本之间的差别,最大差别在于6中每个索引存储数据时需要指定Type,而7中不需要指定Type,即一个索引对应一个Mapping。
      目前项目的应用软件中使用6的客户端索引建立规则为:一个索引对应一个Type, 索引名与Type名相同,在升级ES的服务器版本到7之后,目前的这种索引创建模式需要修改,否则在服务器版本升级到7之后无法运行。

    3. 设计方案

    3.1. 索引模型兼容

      为兼容6.8.1与7.8.1的ES服务器,应用软件在创建索引时不指定Type。目前High Level 6.8.1的API中创建索引可以不指定Type参数,创建的索引默认Type为“_doc”,而7.8.1的服务器支持对于Type为“_doc”的DSL查询。
      应用软件的JAVA客户端使用High Level 6.8.1在创建索引时不指定Type,对于查询、聚合的API需要填入Type的情况,使用默认Type值“_doc”。

    3.2. 封装ES操作

      为避免应用软件不同的服务使用High Level出现姿势不一样的情况。视信通将对ES的操作封装到一个公共工具类中,对于各应用服务而言,不再关注ES的实现细节,运行方式也都一致,更便于扩展与维护。

    3.3. 使用High Level与LOW Level结合的方式兼容ES操作

      尽管官方宣称High Level的向前兼容性,但是在语法格式上还有一些细微差别。如对于查询返回结果的总数字段(Total):
      在6.8.1服务器中,查询返回命中结果如下:

    {
        "took": 0,
        "timed_out": false,
        "_shards": {
            "total": 5,
            "successful": 5,
            "skipped": 0,
            "failed": 0
        },
        "hits": {
            "total": 0,
            "max_score": null,
            "hits": []
        }
    }
    

      请注意,这里total字段为一个整型。
      但在7.8.1的服务器中,查询返回命中结果如下:

    {
        "took": 6,
        "timed_out": false,
        "_shards": {
            "total": 1,
            "successful": 1,
            "skipped": 0,
            "failed": 0
        },
        "hits": {
            "total": {
                "value": 0,
                "relation": "eq"
            },
            "max_score": null,
            "hits": []
        }
    }
    

      若使用6.8.1的High Level去查询7.8.1的服务器,会出现API解析total字段不正确的情况(始终返回为0)。
      针对这种情况,在视信通的ES封装函数中首先使用Java Low Level REST Client执行该查询,然后针对返回的结果JSON解析total字段是一个整型还是JSON对象来获取正确的total值。

    3.4. 数据迁移

      由于老的应用软件版本中ES索引模式为ES索引名与Type名相同,这种方式不适用于兼容方案。
      因而在使用ES兼容方案后,应用软件升级时需要重新创建索引(兼容索引),可使用ES的reindex命令,示例如下:

    POST http://172.16.249.177:9200/_reindex
    {
      "source": {
        "index": "vline_event_recorder_exception",
        "type":"vline_event_recorder_exception"
      },
      "dest": {
        "index": "vline_event_recorder_exception_test",
        "type": "_doc"
      }
    }
    

    附录:ES客户端封装函数

    import lombok.extern.slf4j.Slf4j;
    
    import org.apache.http.HttpHost;
    import org.apache.http.util.EntityUtils;
    import org.elasticsearch.action.DocWriteResponse;
    import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.search.SearchRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.search.SearchScrollRequest;
    import org.elasticsearch.action.support.master.AcknowledgedResponse;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.Request;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.Response;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.indices.CreateIndexRequest;
    import org.elasticsearch.client.indices.CreateIndexResponse;
    import org.elasticsearch.client.indices.GetIndexRequest;
    import org.elasticsearch.client.indices.PutMappingRequest;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.common.xcontent.XContentBuilder;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.elasticsearch.index.query.QueryBuilder;
    import org.elasticsearch.index.reindex.BulkByScrollResponse;
    import org.elasticsearch.index.reindex.DeleteByQueryRequest;
    import org.elasticsearch.rest.RestStatus;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHits;
    import org.elasticsearch.search.builder.SearchSourceBuilder;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.fastjson.serializer.SerializerFeature;
    import com.kedacom.microservice.pojo.AbstractCommonIndex;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author zhang.kai
     *
     */
    @Slf4j
    public class ESDataUtils {
        
        private static final int SCROLL_PAGE_SIZE = 1000; // 每次scroll的页面大小
        
        private static final String DEFAULT_TYPE = "_doc";
    
        // lowLevelClient客户端
        public static RestClient restClient;
    
        // RestHighLevelClient客户端
        public static RestHighLevelClient highLevelClient;
    
        // 索引创建时的分片值,如为-1则使用系统默认值
        public static int numberOfShards = -1;
    
        // 索引创建时的分片复制因子,如为-1则使用系统默认值
        public static int numberOfReplicas = -1;
    
        public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        public static SimpleDateFormat sdfForLog = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        public static SimpleDateFormat sdfForName = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        // 内部类,返回分页查询结果
        public static class EsSearchResult {
            public List<String> recorders; // 查询到的记录
            public SearchResponse sr; // 执行查询的结果对象,可能包含聚合指标
            public long total; // 文档总数
        }
    
        /**
         * 根据配置中心获取的es服务器地址与端口初始化RestClient与RestHighLevelClient
         * 
         * @param esHost es服务器地址
         * @param esPort es服务器端口
         */
        public static void initClient(String esHost, int esPort) {
            RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(esHost, esPort, "http"));
            ESDataUtils.restClient = restClientBuilder.build();
            ESDataUtils.highLevelClient = new RestHighLevelClient(restClientBuilder);
        }
    
        /**
         * 判断索引是否存在
         * 
         * @param indexName
         * @return
         * @throws IOException
         */
        public static boolean ifIndexExists(String indexName) throws IOException {
            GetIndexRequest request = new GetIndexRequest(indexName);
            return highLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        }
    
        /**
         * 创建ES索引,参数为索引配置包装类,此方法适用于已经创建索引配置类
         * 
         * @param index
         * @return
         * @throws IOException
         */
        public static boolean createIndex(AbstractCommonIndex index) throws IOException {
            String indexName = index.getIndexName();
            CreateIndexRequest request = new CreateIndexRequest(indexName);
    
            if (!ifIndexExists(indexName)) {
                if (null != index.getSettings()) {
                    String settings = index.getSettings();
                    JSONObject settingsJo = JSON.parseObject(settings);
                    if (numberOfShards > 0) {
                        settingsJo.put("index.number_of_shards", numberOfShards);
                    }
                    if (numberOfReplicas > 0) {
                        settingsJo.put("index.number_of_replicas", numberOfReplicas);
                    }
                    request.settings(JSON.toJSONString(settingsJo, SerializerFeature.WriteMapNullValue), XContentType.JSON);
                }
                request.mapping(index.getMapping());
                log.info("createIndex {}:{}", indexName, request.toString());
                CreateIndexResponse createIndexResponse = highLevelClient.indices().create(request, RequestOptions.DEFAULT);
                if (!createIndexResponse.isAcknowledged()) {
                    log.error("createIndex fail:{}", createIndexResponse);
                }
                return createIndexResponse.isAcknowledged();
            }
            log.info("{} has created!", indexName);
            return false;
        }
    
        /**
         * 创建ES索引,参数为字符串形式,此方法适用于未创建索引配置类,直接适用json字符串作为参数
         * 
         * @param indexName
         * @param mappingStr
         * @param settingsStr
         * @return
         * @throws IOException
         */
        public static boolean createIndex(String indexName, String mappingStr, String settingsStr) throws IOException {
            CreateIndexRequest request = new CreateIndexRequest(indexName);
    
            if (!ifIndexExists(indexName)) {
                if (null != settingsStr) {
                    JSONObject settingsJo = JSON.parseObject(settingsStr);
                    if (numberOfShards > 0) {
                        settingsJo.put("index.number_of_shards", numberOfShards);
                    }
                    if (numberOfReplicas > 0) {
                        settingsJo.put("index.number_of_replicas", numberOfReplicas);
                    }
                    request.settings(JSON.toJSONString(settingsJo, SerializerFeature.WriteMapNullValue), XContentType.JSON);
                }
                if (null != mappingStr) {
                    request.mapping(mappingStr, XContentType.JSON);
                }
                log.info("createIndex {}:{}", indexName, request.toString());
                CreateIndexResponse createIndexResponse = highLevelClient.indices().create(request, RequestOptions.DEFAULT);
                if (!createIndexResponse.isAcknowledged()) {
                    log.error("createIndex fail:{}", createIndexResponse);
                }
                return createIndexResponse.isAcknowledged();
            }
            log.info("{} has created!", indexName);
            return false;
        }
    
        /**
         * 修改索引映射,参数为XContentBuilder
         * 
         * @param indexName
         * @param mappingStr
         * @return
         * @throws IOException
         */
        public static boolean updateMapping(String indexName, XContentBuilder mapBuilder) throws IOException {
            PutMappingRequest request = new PutMappingRequest(indexName);
            request.source(mapBuilder);
    
            log.info("putMapping:{},{}", indexName, request.source());
            AcknowledgedResponse putMappingResponse = highLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
            if (!putMappingResponse.isAcknowledged()) {
                log.error("updateMapping fail:{}", putMappingResponse.toString());
            }
            return putMappingResponse.isAcknowledged();
        }
    
        /**
         * 修改索引映射,参数为String
         * 
         * @param indexName
         * @param mappingStr
         * @return
         * @throws IOException
         */
        public static boolean updateMapping(String indexName, String mappingStr) throws IOException {
            PutMappingRequest request = new PutMappingRequest(indexName);
            request.source(mappingStr, XContentType.JSON);
    
            log.info("putMapping:{},{}", indexName, request.source());
            AcknowledgedResponse putMappingResponse = highLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
            if (!putMappingResponse.isAcknowledged()) {
                log.error("updateMapping fail:{}", putMappingResponse.toString());
            }
            return putMappingResponse.isAcknowledged();
        }
    
        /**
         * 修改索引的设置,参数为Map
         * 
         * @param indexName
         * @param settingsMap
         * @return
         * @throws IOException
         */
        public static boolean updateSettings(String indexName, Map<String, Object> settingsMap) throws IOException {
            UpdateSettingsRequest request = new UpdateSettingsRequest(indexName);
            request.settings(settingsMap);
    
            log.info("updateSettings {}:{}", indexName, settingsMap);
            AcknowledgedResponse updateSettingsResponse = highLevelClient.indices().putSettings(request,
                    RequestOptions.DEFAULT);
            if (!updateSettingsResponse.isAcknowledged()) {
                log.error("updateSettings fail:{}", updateSettingsResponse);
            }
            return updateSettingsResponse.isAcknowledged();
        }
    
        /**
         * 修改索引的设置,参数为String
         * 
         * @param indexName
         * @param settingsMap
         * @return
         * @throws IOException
         */
        public static boolean updateSettings(String indexName, String settingsStr) throws IOException {
            UpdateSettingsRequest request = new UpdateSettingsRequest(indexName);
            request.settings(settingsStr, XContentType.JSON);
    
            log.info("updateSettings {}:{}", indexName, settingsStr);
            AcknowledgedResponse updateSettingsResponse = highLevelClient.indices().putSettings(request,
                    RequestOptions.DEFAULT);
            if (!updateSettingsResponse.isAcknowledged()) {
                log.error("updateSettings fail:{}", updateSettingsResponse);
            }
            return updateSettingsResponse.isAcknowledged();
        }
    
        /**
         * 向索引插入单条记录
         * 
         * @param indexName
         * @param docId
         * @param docSource
         * @return
         * @throws IOException
         */
        public static boolean insertDoc(String indexName, String docId, String docSource) throws IOException {
            IndexRequest request = new IndexRequest(indexName, DEFAULT_TYPE);
            if (null != docId) {
                request.id(docId);
            }
            request.source(docSource, XContentType.JSON);
            IndexResponse indexResponse = highLevelClient.index(request, RequestOptions.DEFAULT);
            if ((indexResponse.status() == RestStatus.CREATED) || (indexResponse.status() == RestStatus.OK)) {
                return true;
            } else {
                log.error("insertDoc fail status:{} => {}", indexResponse.status(), indexResponse.toString());
                return false;
            }
        }
    
        /**
         * 向索引插入批量记录
         * 
         * @param indexName
         * @param docId
         * @param docSource
         * @return
         * @throws IOException
         */
        public static boolean insertDocBulk(String indexName, List<String> docIds, List<String> docSources)
                throws IOException {
            if (null == docSources) {
                log.error("insertDocBulk docSources are null!");
                return false;
            }
            BulkRequest bulkRequest = new BulkRequest();
            for (int i = 0; i < docSources.size(); i++) {
                IndexRequest request = new IndexRequest(indexName, DEFAULT_TYPE);
                if ((null != docIds) && (null != docIds.get(i))) {
                    request.id(docIds.get(i));
                }
                request.source(docSources.get(i), XContentType.JSON);
                bulkRequest.add(request);
            }
            if (bulkRequest.numberOfActions() > 0) {
                BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                if (!bulkResponse.hasFailures()) {
                    return true;
                } else {
                    log.error("insertDocBulk fail status:{} => {}", bulkResponse.status(),
                            bulkResponse.buildFailureMessage());
                    return false;
                }
            }
            return true;
        }
    
        /**
         * 修改es文档中的某些字段值,docId必须存在
         * 
         * @param indexName
         * @param docId
         * @param updateFields
         * @return
         * @throws IOException
         */
        public static boolean updateDoc(String indexName, String docId, String updateFields) throws IOException {
            if (null == docId) {
                log.info("updateDoc {} fail cause docId is null:{}", indexName, updateFields);
                return false;
            }
            UpdateRequest request = new UpdateRequest(indexName, DEFAULT_TYPE, docId);
            request.doc(updateFields, XContentType.JSON);
            UpdateResponse updateResponse = highLevelClient.update(request, RequestOptions.DEFAULT);
            if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                return true;
            } else {
                log.error("updateDoc {} fail by {} :{}", indexName, docId, updateResponse.toString());
                return false;
            }
        }
    
        /**
         * 执行ES查询并支持分页(超过10000条限制的分页)
         * 
         * @param ssb
         * @param from
         * @param size
         * @param indexName
         * @return
         * @throws IOException
         */
        public static EsSearchResult executeEsSearch(SearchSourceBuilder ssb, int from, int size, String indexName)
                throws IOException {
            if (null == ssb) {
                log.error("executeEsSearch args is error!");
                return null;
            }
            EsSearchResult esr = new EsSearchResult();
            SearchResponse searchResponse = null;
            SearchRequest searchRequest = new SearchRequest(indexName);
    
            if (from + size <= 10000) { // 10000以内,正常执行查询
                esr.total = getEsSearchTotal(ssb, indexName);
    
                ssb.from(from).size(size);
                log.info("executble es search dsl for {} is:{}", indexName, ssb.toString());
                searchRequest.source(ssb);
                searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                SearchHits hits = searchResponse.getHits();
                List<String> docList = new ArrayList<String>();
                for (SearchHit hit : hits.getHits()) {
                    docList.add(hit.getSourceAsString());
                }
                esr.recorders = docList;
                esr.sr = searchResponse;
    
            } else {// 超过10000,使用scrollid
                esr.total = getEsSearchTotal(ssb, indexName);
    
                ssb.size(SCROLL_PAGE_SIZE);
                log.info("executble es search dsl for {} is:{}", indexName, ssb.toString());
                searchRequest.source(ssb);
                searchRequest.scroll(TimeValue.timeValueMinutes(3L));
                searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                SearchHits hits = searchResponse.getHits();
                esr.sr = searchResponse;
                // 如果所有记录数小于等于请求起始数,返回null
                if ((null != searchResponse) && (esr.total <= from)) {
                    log.info("total:{} is less than from:{}", esr.total, from);
                    return esr;
                }
                int unFetchedIndex = 0; // 未取到的索引
    
                while ((null != searchResponse) && (searchResponse.status() == RestStatus.OK)) {
                    List<String> curPageList = new ArrayList<String>();
                    for (SearchHit hit : hits.getHits()) {
                        curPageList.add(hit.getSourceAsString());
                    }
    
                    unFetchedIndex += SCROLL_PAGE_SIZE;
                    log.info("current unFetchedIndex is :{}->{}", unFetchedIndex, curPageList.get(0));
                    if (unFetchedIndex > from) {
                        int startIndex = from % SCROLL_PAGE_SIZE;
                        // 只在本页内取,由程序约束:比如一次scroll的size是1000,取得页大小未10,20,50
                        List<String> docList = curPageList.subList(startIndex, startIndex + size);
                        esr.recorders = docList;
                        break;
                    } else {// 继续循环
                        String scrollId = searchResponse.getScrollId();
                        SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                        scrollRequest.scroll(TimeValue.timeValueSeconds(180));
                        searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                    }
                }
            }
            return esr;
    
        }
    
        /**
         * 执行es查询返回所有符合条件的结果
         * 
         * @param jestClient
         * @param ssb
         * @param indexName
         * @return
         * @throws IOException
         */
        public static EsSearchResult searchAllData(SearchSourceBuilder ssb, String indexName) throws IOException {
            if (null == ssb) {
                log.error("executeEsSearch args is error!");
                return null;
            }
            log.info("executble es search all data dsl for {} is:{}", indexName, ssb.toString());
    
            EsSearchResult esr = new EsSearchResult();
            esr.total = getEsSearchTotal(ssb, indexName);
    
            SearchRequest searchRequest = new SearchRequest(indexName);
            searchRequest.source(ssb);
            searchRequest.scroll(TimeValue.timeValueMinutes(3L));
    
            // 使用scrollid
            ssb.size(SCROLL_PAGE_SIZE);
            SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = searchResponse.getHits();
            esr.sr = searchResponse;
    
            List<String> totalData = new ArrayList<>();
            int unFetchedIndex = 0; // 未取到的索引
            while ((null != hits) && (hits.getHits().length > 0)) {
                List<String> curPageList = new ArrayList<String>();
                for (SearchHit hit : hits.getHits()) {
                    curPageList.add(hit.getSourceAsString());
                }
                totalData.addAll(curPageList);
                unFetchedIndex += SCROLL_PAGE_SIZE;
                log.info("current unFetchedIndex is :{}->{}", unFetchedIndex, curPageList.get(0));
                String scrollId = searchResponse.getScrollId();
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(TimeValue.timeValueSeconds(180));
                searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                hits = searchResponse.getHits();
            }
            esr.recorders = totalData;
            return esr;
        }
    
        /**
         * 根据请求的时间段删除对应索引中的数据
         * 
         * @param indexName
         * @param qb
         * @return
         * @throws IOException
         */
        public static boolean deleteByQuery(String indexName, QueryBuilder qb) throws IOException {
    
            DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
            request.setQuery(qb);
    
            request.setConflicts("proceed");
            request.setBatchSize(5000);
            // 并行
            request.setSlices(5);
            // 使用滚动参数来控制“搜索上下文”存活的时间
            request.setScroll(TimeValue.timeValueMinutes(10));
            // 超时
            request.setTimeout(TimeValue.timeValueMinutes(2));
            // 刷新索引
            request.setRefresh(true);
            log.info("***deleteByQuery request uri:{}", request.toString());
            log.info("***deleteByQuery request body:{}", qb.toString());
            BulkByScrollResponse bulkResponse = highLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
            log.info("***handleEsIndexClean response:{}", bulkResponse.toString());
            return true;
        }
    
        /**
         * 使用low level api获取ES查询的total,需要兼容6.8.1与7.8.1版本
         * 
         * @param ssb
         * @param indexName
         * @return
         * @throws IOException
         */
        private static long getEsSearchTotal(SearchSourceBuilder ssb, String indexName) throws IOException {
            long total = 0;
            ssb.from(0).size(0);
    
            Request request = new Request("GET", "/" + indexName + "/_search");
            request.setJsonEntity(ssb.toString());
            Response response = restClient.performRequest(request);
            int statusCode = response.getStatusLine().getStatusCode();
            String responseBody = EntityUtils.toString(response.getEntity());
    
            if (statusCode == 200) {
                JSONObject responseJo = (JSONObject) JSON.parse(responseBody);
                JSONObject hitsJo = responseJo.getJSONObject("hits");
                Object totalJo = hitsJo.get("total");
                if (totalJo instanceof Integer) { // 6.8.1版本
                    total = (long) ((Integer) totalJo);
                } else { // 7.8.1版本
                    total = ((JSONObject) totalJo).getLongValue("value");
                }
            }
            return total;
        }
    
    }
    

    相关文章

      网友评论

        本文标题:JAVA客户端兼容Elasticsearch 6.8与7.8服务

        本文链接:https://www.haomeiwen.com/subject/nrupvktx.html