美文网首页
Elasticseach-java 封装API

Elasticseach-java 封装API

作者: 圆企鹅i | 来源:发表于2023-03-12 15:46 被阅读0次

    前言

    ES7版本推出了lambda形式的全新API
    ES在后面的新版本将会不再更新HighLevelClient
    当然最主要的还是网上搜不到样例代码 比较生气 官网代码里的单元测试写得乱的一笔 干脆自己封装

    依赖

       
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>8</maven.compiler.source>
            <target.java.version>1.8</target.java.version>
            <elasticsearch.java.version>7.17.9</elasticsearch.java.version>
            <jackson.version>2.12.3</jackson.version>
            <lombok.version>1.18.20</lombok.version>
            <hutool.version>5.8.12</hutool.version>
            <slf4j.version>1.7.36</slf4j.version>
        </properties>
    
        <dependencies>
            <!--   es lambda 全新api-->
            <dependency>
                <groupId>co.elastic.clients</groupId>
                <artifactId>elasticsearch-java</artifactId>
                <version>${elasticsearch.java.version}</version>
            </dependency>
    
            <!--   es 官网必备的依赖 https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/installation.html -->
            <!--   todo 非常以及特别的容易出现依赖冲突 建议单独封装一个模块 打成shaded依赖-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>${jackson.version}</version>
            </dependency>
    
            <!-- 需要安装lombok插件-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
    
            <!-- 日志 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
    
            <!--     工具包-->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>${hutool.version}</version>
            </dependency>
        </dependencies>
    

    Code

    Client

    package elasticsearch.util;
    
    import cn.hutool.core.util.StrUtil;
    import cn.hutool.setting.dialect.Props;
    import co.elastic.clients.elasticsearch.ElasticsearchClient;
    import co.elastic.clients.json.jackson.JacksonJsonpMapper;
    import co.elastic.clients.transport.ElasticsearchTransport;
    import co.elastic.clients.transport.endpoints.BooleanResponse;
    import co.elastic.clients.transport.rest_client.RestClientTransport;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.http.HttpHost;
    import org.elasticsearch.client.RestClient;
    
    import java.io.IOException;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author zhangxuecheng4441
     * @date 2022/4/7/007 10:48
     */
    @Slf4j
    public class EsClientUtils {
        /**
         * restClient
         */
        private static final RestClient REST_CLIENT = RestClient.builder(getDefaultEsHosts().toArray(new HttpHost[]{})).build();
    
        /**
         * Create the transport with a Jackson mapper
         */
        private static final ElasticsearchTransport TRANSPORT = new RestClientTransport(
                REST_CLIENT, new JacksonJsonpMapper());
    
        /**
         * And create the API client
         */
        private static final ElasticsearchClient CLIENT = new ElasticsearchClient(TRANSPORT);
    
        public static ElasticsearchClient getClient() {
            return CLIENT;
        }
    
    
        /**
         * 判断索引是否存在
         *
         * @param index index name
         * @return is exists
         */
        public static Boolean indexIsExist(String index) {
            try {
                BooleanResponse exists = EsClientUtils.getClient().indices().exists(iq -> iq.index(index));
                return exists.value();
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
            return false;
        }
    
        /**
         * 创建索引
         *
         * @param index index name
         */
        public static void createIndex(String index) {
            try {
                EsClientUtils.getClient().indices().create(cir -> cir.index(index));
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
    
        }
    
        /**
         * http 地址
         *
         * @param hosts hosts
         * @return List<HttpHost>
         */
        public static List<HttpHost> getEsHosts(String hosts) {
            List<HttpHost> httpHosts = new ArrayList<>();
            String[] split = hosts.split(StrUtil.COMMA);
            for (String s : split) {
                httpHosts.add(HttpHost.create(s));
            }
            return httpHosts;
        }
    
    
        public static List<HttpHost> getDefaultEsHosts() {
            String esHostsConfKey = "es.hosts";
            return getEsHosts(new Props("application.properties", Charset.defaultCharset()).getStr(esHostsConfKey, "es1:9200"));
        }
    }
    
    

    Search

    package elasticsearch.util;
    
    import cn.hutool.core.collection.CollUtil;
    import cn.hutool.core.util.CharUtil;
    import cn.hutool.core.util.StrUtil;
    import co.elastic.clients.elasticsearch.ElasticsearchClient;
    import co.elastic.clients.elasticsearch._types.Time;
    import co.elastic.clients.elasticsearch._types.query_dsl.Query;
    import co.elastic.clients.elasticsearch.core.*;
    import co.elastic.clients.elasticsearch.core.search.Hit;
    import lombok.extern.slf4j.Slf4j;
    import lombok.val;
    
    import java.io.IOException;
    import java.io.StringReader;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.function.Consumer;
    import java.util.stream.Collectors;
    
    import static cn.hutool.core.text.StrPool.COMMA;
    
    /**
     * @author zxc
     * @date 2022/6/22/022 11:08
     */
    @Slf4j
    public class EsSearchUtil {
    
        /**
         * index 切割
         *
         * @param indexStr aaa,bbb
         * @return [aaa, bbb]
         */
        public static String[] getSearchIndex(String indexStr) {
            return StrUtil.splitToArray(indexStr, CharUtil.COMMA);
        }
    
        /**
         * 构造一个简单terms函数
         *
         * @param field  字段
         * @param values values
         * @return Query
         */
        public static Query termsQuery(String field, Collection<String> values) {
            int batch = 1000;
            ArrayList<Query> list = new ArrayList<>();
    
            //1000个数据切割一次
            for (List<String> value1000 : CollUtil.split(values, batch)) {
                Query query = Query.of(qb -> qb
                        .terms(termsBuilder -> termsBuilder
                                .field(field)
                                .terms(tf -> tf.value(EsCommonUtils.strList2Field(value1000)))
                        )
                );
                list.add(query);
            }
    
            //拼成一个查询语句
            return Query.of(qb -> qb
                    .bool(bool -> bool
                            .should(list)
                    )
            );
        }
    
        /**
         * 构造一个简单terms函数
         *
         * @param field  字段
         * @param values values
         * @return Query
         */
        public static Query termsQuery(String field, String... values) {
            return Query.of(qb -> qb
                    .terms(termsBuilder -> termsBuilder
                            .field(field)
                            .terms(tf -> tf.value(EsCommonUtils.str2Field(values)))
                    )
            );
        }
    
        /**
         * 构造一个简单match函数
         *
         * @param field 字段
         * @param value value es7支持match:"aaa,bbb,ccc"
         * @return Query
         */
        public static Query matchQuery(String field, String value) {
            return Query.of(qb -> qb
                    .match(match -> match
                            .field(field)
                            .query(value)
                    )
            );
        }
    
        /**
         * match all query
         *
         * @return match all query
         */
        public static Query matchAll() {
            return Query.of(qb -> qb
                    .matchAll(match -> match
                    )
            );
        }
    
        /**
         * exists query
         *
         * @param field field
         * @return query
         */
        public static Query existsQuery(String field) {
            return Query.of(q -> q.exists(
                    e -> e.field(field)
            ));
        }
    
        /**
         * es dsl查询语句 生成query
         *
         * @param dsl es dsl
         * @return query
         */
        public static Query dslParseQuery(String dsl) {
            return Query.of(q -> q.withJson(new StringReader(dsl)));
        }
    
        /**
         * es dsl查询语句 生成query
         *
         * @param query query
         * @return dsl str
         */
        public static String query2Dsl(Query query) {
            return StrUtil.removePrefix(query.toString(), "Query:");
        }
    
        /**
         * scroll查询出数据,然后根据需要处理
         *
         * @param client       ElasticsearchClient
         * @param query        Query.of()
         * @param index        index
         * @param dataConsumer 数据消费处理函数
         * @throws IOException IOException
         */
        public static void scrollHandle(ElasticsearchClient client, Query query, List<String> index, Consumer<List<Object>> dataConsumer) throws IOException {
            long total = 0;
    
            SearchResponse<Object> response = client.search(
                    SearchRequest.of(sear -> sear
                            .query(query)
                            .scroll(Time.of(ts -> ts.time("2m")))
                            .index(index)
                            .size(5000)
                    )
                    //此处使用es默认的反序列化 容易出现问题
                    , Object.class
            );
            log.info("search from index:{},dsl:\r\n{}", index, query.toString());
    
            if (CollUtil.isEmpty(response.hits().hits())) {
                log.warn("can not find result from this search");
                return;
            }
    
            List<Object> objects = response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
            dataConsumer.accept(objects);
    
            log.info("search size:{}", total = total + objects.size());
    
            String scrollId = response.scrollId();
    
            while (scrollId != null) {
                final String searchScroll = scrollId;
                ScrollResponse<Object> scrollResponse = client.scroll(
                        ScrollRequest.of(scroll -> scroll
                                .scrollId(searchScroll)
                                .scroll(time -> time.time("2m"))
                        ), Object.class
                );
    
                List<Hit<Object>> hitList = scrollResponse.hits().hits();
                if (hitList.isEmpty()) {
                    break;
                } else {
                    scrollId = scrollResponse.scrollId();
                }
    
                List<Object> scrollObjects = hitList.stream().map(Hit::source).collect(Collectors.toList());
                dataConsumer.accept(scrollObjects);
                log.info("scroll search size:{}", total = total + scrollObjects.size());
            }
            log.info("finish scroll search handle total :{}", total);
        }
    
        /**
         * scroll查询出数据,然后根据需要处理
         *
         * @param client       ElasticsearchClient
         * @param query        Query.of()
         * @param index        index
         * @param dataConsumer 数据消费处理函数
         * @throws IOException IOException
         */
        public static void scrollHandle(ElasticsearchClient client, Query query, String index, Consumer<List<Object>> dataConsumer) throws IOException {
            val indexList = getSearchIndexList(index);
            scrollHandle(client, query, indexList, dataConsumer);
        }
    
        /**
         * index -> list<index>
         *
         * @param index index
         * @return list index
         */
        public static List<String> getSearchIndexList(String index) {
            val indexList = new ArrayList<String>();
            if (index.contains(COMMA)) {
                indexList.addAll(StrUtil.split(index, COMMA));
            } else {
                indexList.add(index);
            }
            return indexList;
        }
    
        /**
         * 查询count
         *
         * @param client client
         * @param query  query
         * @param index  index
         * @return count
         */
        public static Long getCount(ElasticsearchClient client, Query query, List<String> index) {
            try {
                CountResponse countResponse = client.count(CountRequest.of(count -> count
                                .index(index)
                                .query(query)
                        )
                );
                return countResponse.count();
    
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
            return null;
        }
    }
    
    

    Write

    package elasticsearch.util;
    
    import cn.hutool.core.collection.ListUtil;
    import cn.hutool.core.lang.Pair;
    import cn.hutool.core.map.MapUtil;
    import co.elastic.clients.elasticsearch.ElasticsearchClient;
    import co.elastic.clients.elasticsearch.core.BulkResponse;
    import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
    import elasticsearch.annotation.ESField;
    import lombok.extern.slf4j.Slf4j;
    import lombok.val;
    
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    /**
     * @author zhangxuecheng4441
     * @date 2022/10/26/026 10:14
     */
    @Slf4j
    public class EsWriteUtils {
    
        /**
         * 写入对象规范
         */
        public interface EsDoc {
            /**
             * 获取id
             *
             * @return id
             */
            String getDocId();
    
            /**
             * 获取routing
             *
             * @return routing
             */
            String getRouting();
        }
    
        /**
         * 数据写入
         *
         * @param client    client
         * @param indexName index name
         * @param bean      doc 注意写入数据受@ESField过滤
         */
        public static void write(ElasticsearchClient client, String indexName, EsDoc bean) {
            BulkResponse bulkResponse;
            try {
                bulkResponse = client.bulk(bulkBd -> bulkBd
                        .operations(bulkOpBd -> bulkOpBd
                                .update(bulkOp -> bulkOp
                                        .index(indexName)
                                        .id(bean.getDocId())
                                        .routing(bean.getRouting())
                                        .action(upAction -> upAction
                                                .docAsUpsert(true)
                                                .doc(ESField.Method.getMap(bean))
                                        )
                                )
                        )
                );
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
    
            if (bulkResponse == null || bulkResponse.errors()) {
                log.error("error bulk es doc({}) and error:{}", bean.getDocId(), bulkResponse);
            } else {
                log.info("write to {} doc success id:{}", indexName, bean.getDocId());
            }
        }
    
    
        /**
         * 数据批量写入
         *
         * @param client    client
         * @param indexName indexName
         * @param esDocs    List<? extends EsDoc> 注意写入数据受@ESField过滤
         */
        public static void batchWrite(ElasticsearchClient client, String indexName, List<? extends EsDoc> esDocs) {
            BulkResponse bulkResponse;
            try {
                final int batch = 1000;
                for (List<? extends EsDoc> beans : ListUtil.partition(esDocs, batch)) {
                    List<BulkOperation> operations = beans.stream()
                            .map(bean -> BulkOperation.of(bulk -> bulk
                                            .update(up -> up
                                                    .index(indexName)
                                                    .id(bean.getDocId())
                                                    .routing(bean.getRouting())
                                                    .action(upAction -> upAction
                                                            .docAsUpsert(true)
                                                            .doc(ESField.Method.getMap(bean))
                                                    )
                                            )
                                    )
                            )
                            .collect(Collectors.toList());
    
                    bulkResponse = client.bulk(bulkBd -> bulkBd
                            .index(indexName)
                            .operations(operations)
                    );
    
                    //client.indices().flush()
    
                    if (bulkResponse == null || bulkResponse.errors()) {
                        log.error("error bulk es doc size:({}) and error:{}", beans.size(), bulkResponse);
                    } else {
                        log.info("success branch update index :{} total:{}", indexName, beans.size());
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 数据批量写入
         *
         * @param client    client
         * @param indexName indexName
         * @param esDocs    List<Pair<String, Map<String,Object>>>
         */
        public static void batchMapWrite(ElasticsearchClient client, String indexName, List<Pair<String, Map<String, Object>>> esDocs) {
            BulkResponse bulkResponse;
            try {
                final int batch = 1000;
                for (List<Pair<String, Map<String, Object>>> beans : ListUtil.partition(esDocs, batch)) {
                    List<BulkOperation> operations = beans.stream()
                            .map(bean -> {
                                        val docMap = bean.getValue();
                                        val doc = MapUtil.removeNullValue(docMap);
                                        return BulkOperation.of(bulk -> bulk
                                                .update(up -> up
                                                        .index(indexName)
                                                        .id(bean.getKey())
                                                        .routing(bean.getKey())
                                                        .action(upAction -> upAction
                                                                .docAsUpsert(true)
                                                                .doc(doc)
                                                        )
                                                )
                                        );
                                    }
                            )
                            .collect(Collectors.toList());
    
                    bulkResponse = client.bulk(bulkBd -> bulkBd
                            .index(indexName)
                            .operations(operations)
                    );
    
                    if (bulkResponse == null || bulkResponse.errors()) {
                        log.error("error bulk es doc size:({}) and error:{}", beans.size(), bulkResponse);
                    } else {
                        log.info("success branch update index :{} total:{}", indexName, beans.size());
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    

    Agg

    package elasticsearch.util;
    
    import cn.hutool.core.collection.CollUtil;
    import cn.hutool.core.util.ObjectUtil;
    import co.elastic.clients.elasticsearch.ElasticsearchClient;
    import co.elastic.clients.elasticsearch._types.aggregations.*;
    import co.elastic.clients.elasticsearch._types.query_dsl.Query;
    import co.elastic.clients.elasticsearch.core.SearchRequest;
    import co.elastic.clients.elasticsearch.core.SearchResponse;
    import lombok.extern.slf4j.Slf4j;
    import lombok.var;
    
    import java.io.IOException;
    import java.util.*;
    
    /**
     * @author zxc
     * @date 2022/9/2/002 15:14
     */
    @Slf4j
    public class EsAggUtils {
    
        /**
         * terms agg
         *
         * @param field field
         * @param topN  topN
         * @return Aggregation
         */
        public static Aggregation termsAgg(String field, Integer topN) {
            return Aggregation.of(agg -> agg
                    .terms(ts -> ts
                            .field(field)
                            .size(topN)
                    )
            );
        }
    
        /**
         * sum agg
         *
         * @param field field
         * @return Aggregation
         */
        public static Aggregation sumAgg(String field) {
            return Aggregation.of(agg -> agg
                    .sum(sum -> sum
                            .field(field)
                    )
    
            );
        }
    
        /**
         * term嵌套2层
         *
         * @param field1 field1
         * @param topN1  topN1
         * @param field2 field2
         * @param topN2  topN2
         * @return Aggregation
         */
        public static Aggregation termsTermsAgg(String field1, Integer topN1, String field2, Integer topN2) {
            return Aggregation.of(
                    agg -> agg.terms(
                                    ts -> ts.field(field1)
                                            .size(topN1)
                            )
                            .aggregations(field2, termsAgg(field2, topN2))
            );
        }
    
        /**
         * terms agg
         *
         * @param field    field
         * @param sumField sumField
         * @param topN     topN
         * @return Aggregation
         */
        public static Aggregation termsSumAgg(String field, String sumField, Integer topN) {
            return Aggregation.of(agg -> agg
                    .terms(ts -> ts
                            .field(field)
                            .size(topN)
                    )
                    .aggregations(sumField, sumAgg(sumField))
            );
        }
    
        /**
         * term 嵌套三层
         *
         * @param field1
         * @param topN1
         * @param field2
         * @param topN2
         * @param field3
         * @param topN3
         * @return
         */
        public static Aggregation terms3Agg(String field1, Integer topN1, String field2, Integer topN2, String field3, Integer topN3) {
            return Aggregation.of(
                    agg -> agg.terms(
                            ts -> ts.field(field1)
                                    .size(topN1)
                    ).aggregations(field2, Aggregation.of(
                                    agg1 -> agg1.terms(
                                            ts1 -> ts1.field(field2).size(topN2)
                                    ).aggregations(field3, Aggregation.of(
                                                    agg2 -> agg2.terms(
                                                            ts2 -> ts2.field(field3).size(topN3)
                                                    )
    
                                            )
                                    )
                            )
                    )
            );
        }
    
        /**
         * 简单的terms agg
         *
         * @param client     client
         * @param query      query
         * @param termsField termsField
         * @param topN       topN
         * @param indexes    索引
         * @return HashMap<terms, count>
         */
        public static HashMap<String, Long> termsAgg(ElasticsearchClient client, Query query, String termsField, Integer topN, List<String> indexes) {
            try {
                topN = ObjectUtil.defaultIfNull(topN, 10000);
                HashMap<String, Long> aggMap = new LinkedHashMap<>((int) (topN / 0.75) + 1);
                //term agg
                Aggregation aggregation = termsAgg(termsField, topN);
                SearchRequest searchRequest = new SearchRequest.Builder()
                        .index(indexes)
                        .query(query)
                        .aggregations(termsField, aggregation)
                        .build();
    
                //search
                SearchResponse<Object> response = client.search(searchRequest, Object.class);
    
                var termsBuckets = response.aggregations()
                        .get(termsField)
                        .sterms()
                        .buckets()
                        .array();
    
                //result
                for (var termsBucket : termsBuckets) {
                    aggMap.put(termsBucket.key(), termsBucket.docCount());
                }
    
                return aggMap;
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
            return null;
        }
    
    
        /**
         * 简单的terms agg
         *
         * @param client     client
         * @param query      query
         * @param termsField termsField
         * @param topN       topN
         * @param indexes    索引
         * @return HashMap<terms, count>
         */
        public static HashMap<String, Long> termsAgg(ElasticsearchClient client, Query query, String termsField, Integer topN, String... indexes) {
            return termsAgg(client, query, termsField, topN, Arrays.asList(indexes));
        }
    
    
        public static HashMap<String, HashMap<String, Long>> termsTermsAgg(ElasticsearchClient client, Query query, String termsField1, Integer topN1, String termsField2, Integer topN2, String... indexes) {
            try {
                HashMap<String, HashMap<String, Long>> aggMap = new LinkedHashMap<>((int) (topN1 / 0.75) + 1);
                //term term agg
                Aggregation aggregation = termsTermsAgg(termsField1, topN1, termsField2, topN2);
    
                SearchRequest searchRequest = new SearchRequest.Builder()
                        .index(Arrays.asList(indexes))
                        .query(query)
                        .aggregations(termsField1, aggregation)
                        .build();
    
                //search
                SearchResponse<Object> response = client.search(searchRequest, Object.class);
    
                //第一层terms
                Optional.ofNullable(response.aggregations())
                        .map(aggregations -> aggregations.get(termsField1))
                        .map(Aggregate::sterms)
                        .map(StringTermsAggregate::buckets)
                        .map(Buckets::array)
                        .ifPresent(termsBuckets -> {
                            //result
                            for (var termsBucket : termsBuckets) {
                                HashMap<String, Long> subMap = new HashMap<>((int) (topN2 / 0.75) + 1);
    
                                //第二次terms
                                Optional.ofNullable(termsBucket.aggregations())
                                        .map(aggregations -> aggregations.get(termsField2))
                                        .map(Aggregate::sterms)
                                        .map(StringTermsAggregate::buckets)
                                        .map(Buckets::array)
                                        .ifPresent(terms2Buckets -> {
                                            for (StringTermsBucket bucket2 : terms2Buckets) {
                                                subMap.put(bucket2.key(), bucket2.docCount());
                                            }
                                        });
                                aggMap.put(termsBucket.key(), subMap);
                            }
                        });
                return aggMap;
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
            return null;
        }
    
        public static LinkedHashMap<String, Object> wrapNAgg(Map<String, Aggregate> aggregateMap) {
            LinkedHashMap<String, Object> resultMap = new LinkedHashMap<>();
            for (Map.Entry<String, Aggregate> entry : aggregateMap.entrySet()) {
                String key = entry.getKey();
                Aggregate aggregate = entry.getValue();
                LinkedHashMap<String, Object> singleResults = new LinkedHashMap<>();
                if (aggregate.isSterms()) {
                    for (StringTermsBucket bucket : aggregate.sterms().buckets().array()) {
                        if (CollUtil.isEmpty(bucket.aggregations())) {
                            singleResults.put(bucket.key(), bucket.docCount());
                        } else {
                            singleResults.put(bucket.key(), wrapNAgg(bucket.aggregations()));
                        }
                    }
                } else if (aggregate.isLterms()) {
                    for (LongTermsBucket bucket : aggregate.lterms().buckets().array()) {
                        if (CollUtil.isEmpty(bucket.aggregations())) {
                            singleResults.put(bucket.key(), bucket.docCount());
                        } else {
                            singleResults.put(bucket.key(), wrapNAgg(bucket.aggregations()));
                        }
                    }
                } else if (aggregate.isSum()) {
                    singleResults.put(key, aggregate.sum().value());
                } else if (aggregate.isDateHistogram()) {
                    List<DateHistogramBucket> dateHistogramBuckets = aggregate.dateHistogram().buckets().array();
                    LinkedHashMap<String, Long> dateMap = new LinkedHashMap<>((int) (dateHistogramBuckets.size() / 0.75) + 1);
                    dateHistogramBuckets.forEach(bucket -> dateMap.put(bucket.keyAsString(), bucket.docCount()));
                    resultMap.put(key, dateMap);
                    continue;
                }
                resultMap.put(key, singleResults);
            }
            return resultMap;
        }
    
        /**
         * termsSumAgg
         *
         * @param client      client
         * @param query       query
         * @param aggField    agg一层字段
         * @param aggSumField agg sum 二层字段
         * @param topN        topN
         * @param indexList   indexList
         * @return Map<String, Double> k:agg field v:sum
         */
        public static Map<String, Double> termsSumAgg(ElasticsearchClient client, Query query, String aggField, String aggSumField, Integer topN, List<String> indexList) {
            Map<String, Double> aggMap = new LinkedHashMap<>((int) (topN / 0.75) + 1);
            try {
                //term agg
                Aggregation aggregation = termsSumAgg(aggField, aggSumField, topN);
    
                SearchRequest searchRequest = new SearchRequest.Builder()
                        .index(indexList)
                        .query(query)
                        .aggregations(aggField, aggregation)
                        .build();
    
                //search
                SearchResponse<Object> response = client.search(searchRequest, Object.class);
    
                //第一层terms
                Optional.ofNullable(response.aggregations())
                        .map(aggregations -> aggregations.get(aggField))
                        .map(Aggregate::sterms)
                        .map(StringTermsAggregate::buckets)
                        .map(Buckets::array)
                        .ifPresent(termsBuckets -> {
                            //result
                            for (var termsBucket : termsBuckets) {
                                //sum agg
                                Optional.ofNullable(termsBucket.aggregations())
                                        .map(aggregations -> aggregations.get(aggSumField))
                                        .map(Aggregate::sum)
                                        .map(SingleMetricAggregateBase::value)
                                        .ifPresent(value -> aggMap.put(termsBucket.key(), value));
    
                            }
                        });
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
            return aggMap;
        }
    }
    
    

    Other

    ESField

    package elasticsearch.annotation;
    
    import elasticsearch.util.ClazzUtil;
    import lombok.extern.slf4j.Slf4j;
    import lombok.val;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * es字段,实体字段只有标明了该字段,才能够写入到es中
     *
     * @author zhangxuecheng4441
     * @date 2022/8/12/012 13:42
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.FIELD, ElementType.TYPE})
    public @interface ESField {
    
    
        @Slf4j
        class Method {
            /**
             * bean 转 map 受ESField字段过滤
             *
             * @param obj obj
             * @param <T> obj T
             * @return obj map
             */
            public static <T> Map<String, Object> getMap(T obj) {
                ESField esAnno = obj.getClass().getAnnotation(ESField.class);
                //类上包含注解 所有字段写入es
                final boolean matchAllField = esAnno != null;
                val fields = ClazzUtil.getClazzAllField(obj.getClass());
    
                String errorFieldName = "serialVersionUID";
                return fields.stream()
                        .peek(field -> field.setAccessible(true))
                        //类或者字段上包含注解 则写入es
                        .filter(field -> matchAllField || field.getAnnotation(ESField.class) != null)
                        .filter(field -> !errorFieldName.equals(field.getName()))
                        .collect(HashMap::new,
                                (map, field) -> {
                                    try {
                                        if (field.get(obj) != null) {
                                            map.put(field.getName(), field.get(obj));
                                        }
                                    } catch (IllegalAccessException e) {
                                        e.printStackTrace();
                                    }
                                },
                                HashMap::putAll);
            }
        }
    }
    
    

    EsDoc

    package elasticsearch.pojo;
    
    /**
     * 写入对象规范
     *
     * @author zhangxuecheng4441
     * @date 2023/2/27/027 10:06
     */
    public interface EsDoc {
        /**
         * 获取id
         *
         * @return id
         */
        String getDocId();
    
        /**
         * 获取routing
         *
         * @return routing
         */
        String getRouting();
    }
    
    

    EsCommonUtils

    package elasticsearch.util;
    
    import cn.hutool.core.collection.ListUtil;
    import co.elastic.clients.elasticsearch._types.FieldValue;
    
    import java.util.Collection;
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * @author zxc
     * @date 2022/5/26/026 16:03
     */
    public class EsCommonUtils {
        /**
         * string  转 List<FieldValue>
         *
         * @param fields strings
         * @return List<FieldValue>
         */
        public static List<FieldValue> str2Field(String... fields) {
            return strList2Field(ListUtil.toList(fields));
        }
    
        /**
         * string list 转 List<FieldValue>
         *
         * @param fields string list
         * @return List<FieldValue>
         */
        public static List<FieldValue> strList2Field(Collection<String> fields) {
            return fields.stream()
                    .map(FieldValue::of)
                    .collect(Collectors.toList());
        }
    }
    

    ClazzUtil

    package elasticsearch.util;
    
    import cn.hutool.core.util.ClassUtil;
    import lombok.val;
    
    import java.lang.reflect.Field;
    import java.util.Arrays;
    import java.util.HashSet;
    import java.util.Set;
    
    /**
     * @author zhangxuecheng4441
     * @date 2023/2/16/016 11:25
     */
    public class ClazzUtil extends ClassUtil {
    
        /**
         * 获取所有父类
         * @param clazz clazz
         * @return Set<Class<?>>
         */
        public static Set<Class<?>> getSuperClazz(Class<?> clazz) {
            val clazzList = new HashSet<Class<?>>();
    
            //递归找父类
            if (clazz.getSuperclass() != null) {
                clazzList.add(clazz);
                clazzList.addAll(getSuperClazz(clazz.getSuperclass()));
            }else {
                clazzList.add(clazz);
            }
            return clazzList;
        }
    
        /**
         * 获取所有字段
         *
         * @param clazz clazz
         * @return  Set<Field>
         */
        public static Set<Field> getClazzAllField(Class<?> clazz) {
            val superClazz = getSuperClazz(clazz);
            val fields = new HashSet<Field>();
    
            for (Class<?> aClass : superClazz) {
                fields.addAll(Arrays.asList(ClassUtil.getDeclaredFields(aClass)));
            }
    
            return fields;
        }
    }
    
    

    github

    https://github.com/opop32165455/elasticsearch-java-7.x

    后言

    欢迎提意见。

    相关文章

      网友评论

          本文标题:Elasticseach-java 封装API

          本文链接:https://www.haomeiwen.com/subject/kiiqjltx.html