美文网首页
Elasticseach-java 封装API

Elasticseach-java 封装API

作者: 圆企鹅i | 来源:发表于2023-03-12 15:46 被阅读0次

前言

ES7版本推出了lambda形式的全新API
ES在后面的新版本将会不再更新HighLevelClient
当然最主要的还是网上搜不到样例代码 比较生气 官网代码里的单元测试写得乱的一笔 干脆自己封装

依赖

   
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <target.java.version>1.8</target.java.version>
        <elasticsearch.java.version>7.17.9</elasticsearch.java.version>
        <jackson.version>2.12.3</jackson.version>
        <lombok.version>1.18.20</lombok.version>
        <hutool.version>5.8.12</hutool.version>
        <slf4j.version>1.7.36</slf4j.version>
    </properties>

    <dependencies>
        <!--   es lambda 全新api-->
        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>${elasticsearch.java.version}</version>
        </dependency>

        <!--   es 官网必备的依赖 https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/installation.html -->
        <!--   todo 非常以及特别的容易出现依赖冲突 建议单独封装一个模块 打成shaded依赖-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <!-- 需要安装lombok插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <!--     工具包-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool.version}</version>
        </dependency>
    </dependencies>

Code

Client

package elasticsearch.util;

import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

/**
 * @author zhangxuecheng4441
 * @date 2022/4/7/007 10:48
 */
@Slf4j
public class EsClientUtils {
    /**
     * restClient
     */
    private static final RestClient REST_CLIENT = RestClient.builder(getDefaultEsHosts().toArray(new HttpHost[]{})).build();

    /**
     * Create the transport with a Jackson mapper
     */
    private static final ElasticsearchTransport TRANSPORT = new RestClientTransport(
            REST_CLIENT, new JacksonJsonpMapper());

    /**
     * And create the API client
     */
    private static final ElasticsearchClient CLIENT = new ElasticsearchClient(TRANSPORT);

    public static ElasticsearchClient getClient() {
        return CLIENT;
    }


    /**
     * 判断索引是否存在
     *
     * @param index index name
     * @return is exists
     */
    public static Boolean indexIsExist(String index) {
        try {
            BooleanResponse exists = EsClientUtils.getClient().indices().exists(iq -> iq.index(index));
            return exists.value();
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return false;
    }

    /**
     * 创建索引
     *
     * @param index index name
     */
    public static void createIndex(String index) {
        try {
            EsClientUtils.getClient().indices().create(cir -> cir.index(index));
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }

    }

    /**
     * http 地址
     *
     * @param hosts hosts
     * @return List<HttpHost>
     */
    public static List<HttpHost> getEsHosts(String hosts) {
        List<HttpHost> httpHosts = new ArrayList<>();
        String[] split = hosts.split(StrUtil.COMMA);
        for (String s : split) {
            httpHosts.add(HttpHost.create(s));
        }
        return httpHosts;
    }


    public static List<HttpHost> getDefaultEsHosts() {
        String esHostsConfKey = "es.hosts";
        return getEsHosts(new Props("application.properties", Charset.defaultCharset()).getStr(esHostsConfKey, "es1:9200"));
    }
}

Search

package elasticsearch.util;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.StrUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static cn.hutool.core.text.StrPool.COMMA;

/**
 * @author zxc
 * @date 2022/6/22/022 11:08
 */
@Slf4j
public class EsSearchUtil {

    /**
     * index 切割
     *
     * @param indexStr aaa,bbb
     * @return [aaa, bbb]
     */
    public static String[] getSearchIndex(String indexStr) {
        return StrUtil.splitToArray(indexStr, CharUtil.COMMA);
    }

    /**
     * 构造一个简单terms函数
     *
     * @param field  字段
     * @param values values
     * @return Query
     */
    public static Query termsQuery(String field, Collection<String> values) {
        int batch = 1000;
        ArrayList<Query> list = new ArrayList<>();

        //1000个数据切割一次
        for (List<String> value1000 : CollUtil.split(values, batch)) {
            Query query = Query.of(qb -> qb
                    .terms(termsBuilder -> termsBuilder
                            .field(field)
                            .terms(tf -> tf.value(EsCommonUtils.strList2Field(value1000)))
                    )
            );
            list.add(query);
        }

        //拼成一个查询语句
        return Query.of(qb -> qb
                .bool(bool -> bool
                        .should(list)
                )
        );
    }

    /**
     * 构造一个简单terms函数
     *
     * @param field  字段
     * @param values values
     * @return Query
     */
    public static Query termsQuery(String field, String... values) {
        return Query.of(qb -> qb
                .terms(termsBuilder -> termsBuilder
                        .field(field)
                        .terms(tf -> tf.value(EsCommonUtils.str2Field(values)))
                )
        );
    }

    /**
     * 构造一个简单match函数
     *
     * @param field 字段
     * @param value value es7支持match:"aaa,bbb,ccc"
     * @return Query
     */
    public static Query matchQuery(String field, String value) {
        return Query.of(qb -> qb
                .match(match -> match
                        .field(field)
                        .query(value)
                )
        );
    }

    /**
     * match all query
     *
     * @return match all query
     */
    public static Query matchAll() {
        return Query.of(qb -> qb
                .matchAll(match -> match
                )
        );
    }

    /**
     * exists query
     *
     * @param field field
     * @return query
     */
    public static Query existsQuery(String field) {
        return Query.of(q -> q.exists(
                e -> e.field(field)
        ));
    }

    /**
     * es dsl查询语句 生成query
     *
     * @param dsl es dsl
     * @return query
     */
    public static Query dslParseQuery(String dsl) {
        return Query.of(q -> q.withJson(new StringReader(dsl)));
    }

    /**
     * es dsl查询语句 生成query
     *
     * @param query query
     * @return dsl str
     */
    public static String query2Dsl(Query query) {
        return StrUtil.removePrefix(query.toString(), "Query:");
    }

    /**
     * scroll查询出数据,然后根据需要处理
     *
     * @param client       ElasticsearchClient
     * @param query        Query.of()
     * @param index        index
     * @param dataConsumer 数据消费处理函数
     * @throws IOException IOException
     */
    public static void scrollHandle(ElasticsearchClient client, Query query, List<String> index, Consumer<List<Object>> dataConsumer) throws IOException {
        long total = 0;

        SearchResponse<Object> response = client.search(
                SearchRequest.of(sear -> sear
                        .query(query)
                        .scroll(Time.of(ts -> ts.time("2m")))
                        .index(index)
                        .size(5000)
                )
                //此处使用es默认的反序列化 容易出现问题
                , Object.class
        );
        log.info("search from index:{},dsl:\r\n{}", index, query.toString());

        if (CollUtil.isEmpty(response.hits().hits())) {
            log.warn("can not find result from this search");
            return;
        }

        List<Object> objects = response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
        dataConsumer.accept(objects);

        log.info("search size:{}", total = total + objects.size());

        String scrollId = response.scrollId();

        while (scrollId != null) {
            final String searchScroll = scrollId;
            ScrollResponse<Object> scrollResponse = client.scroll(
                    ScrollRequest.of(scroll -> scroll
                            .scrollId(searchScroll)
                            .scroll(time -> time.time("2m"))
                    ), Object.class
            );

            List<Hit<Object>> hitList = scrollResponse.hits().hits();
            if (hitList.isEmpty()) {
                break;
            } else {
                scrollId = scrollResponse.scrollId();
            }

            List<Object> scrollObjects = hitList.stream().map(Hit::source).collect(Collectors.toList());
            dataConsumer.accept(scrollObjects);
            log.info("scroll search size:{}", total = total + scrollObjects.size());
        }
        log.info("finish scroll search handle total :{}", total);
    }

    /**
     * scroll查询出数据,然后根据需要处理
     *
     * @param client       ElasticsearchClient
     * @param query        Query.of()
     * @param index        index
     * @param dataConsumer 数据消费处理函数
     * @throws IOException IOException
     */
    public static void scrollHandle(ElasticsearchClient client, Query query, String index, Consumer<List<Object>> dataConsumer) throws IOException {
        val indexList = getSearchIndexList(index);
        scrollHandle(client, query, indexList, dataConsumer);
    }

    /**
     * index -> list<index>
     *
     * @param index index
     * @return list index
     */
    public static List<String> getSearchIndexList(String index) {
        val indexList = new ArrayList<String>();
        if (index.contains(COMMA)) {
            indexList.addAll(StrUtil.split(index, COMMA));
        } else {
            indexList.add(index);
        }
        return indexList;
    }

    /**
     * 查询count
     *
     * @param client client
     * @param query  query
     * @param index  index
     * @return count
     */
    public static Long getCount(ElasticsearchClient client, Query query, List<String> index) {
        try {
            CountResponse countResponse = client.count(CountRequest.of(count -> count
                            .index(index)
                            .query(query)
                    )
            );
            return countResponse.count();

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }
}

Write

package elasticsearch.util;

import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.map.MapUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import elasticsearch.annotation.ESField;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @author zhangxuecheng4441
 * @date 2022/10/26/026 10:14
 */
@Slf4j
public class EsWriteUtils {

    /**
     * 写入对象规范
     */
    public interface EsDoc {
        /**
         * 获取id
         *
         * @return id
         */
        String getDocId();

        /**
         * 获取routing
         *
         * @return routing
         */
        String getRouting();
    }

    /**
     * 数据写入
     *
     * @param client    client
     * @param indexName index name
     * @param bean      doc 注意写入数据受@ESField过滤
     */
    public static void write(ElasticsearchClient client, String indexName, EsDoc bean) {
        BulkResponse bulkResponse;
        try {
            bulkResponse = client.bulk(bulkBd -> bulkBd
                    .operations(bulkOpBd -> bulkOpBd
                            .update(bulkOp -> bulkOp
                                    .index(indexName)
                                    .id(bean.getDocId())
                                    .routing(bean.getRouting())
                                    .action(upAction -> upAction
                                            .docAsUpsert(true)
                                            .doc(ESField.Method.getMap(bean))
                                    )
                            )
                    )
            );
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        if (bulkResponse == null || bulkResponse.errors()) {
            log.error("error bulk es doc({}) and error:{}", bean.getDocId(), bulkResponse);
        } else {
            log.info("write to {} doc success id:{}", indexName, bean.getDocId());
        }
    }


    /**
     * 数据批量写入
     *
     * @param client    client
     * @param indexName indexName
     * @param esDocs    List<? extends EsDoc> 注意写入数据受@ESField过滤
     */
    public static void batchWrite(ElasticsearchClient client, String indexName, List<? extends EsDoc> esDocs) {
        BulkResponse bulkResponse;
        try {
            final int batch = 1000;
            for (List<? extends EsDoc> beans : ListUtil.partition(esDocs, batch)) {
                List<BulkOperation> operations = beans.stream()
                        .map(bean -> BulkOperation.of(bulk -> bulk
                                        .update(up -> up
                                                .index(indexName)
                                                .id(bean.getDocId())
                                                .routing(bean.getRouting())
                                                .action(upAction -> upAction
                                                        .docAsUpsert(true)
                                                        .doc(ESField.Method.getMap(bean))
                                                )
                                        )
                                )
                        )
                        .collect(Collectors.toList());

                bulkResponse = client.bulk(bulkBd -> bulkBd
                        .index(indexName)
                        .operations(operations)
                );

                //client.indices().flush()

                if (bulkResponse == null || bulkResponse.errors()) {
                    log.error("error bulk es doc size:({}) and error:{}", beans.size(), bulkResponse);
                } else {
                    log.info("success branch update index :{} total:{}", indexName, beans.size());
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 数据批量写入
     *
     * @param client    client
     * @param indexName indexName
     * @param esDocs    List<Pair<String, Map<String,Object>>>
     */
    public static void batchMapWrite(ElasticsearchClient client, String indexName, List<Pair<String, Map<String, Object>>> esDocs) {
        BulkResponse bulkResponse;
        try {
            final int batch = 1000;
            for (List<Pair<String, Map<String, Object>>> beans : ListUtil.partition(esDocs, batch)) {
                List<BulkOperation> operations = beans.stream()
                        .map(bean -> {
                                    val docMap = bean.getValue();
                                    val doc = MapUtil.removeNullValue(docMap);
                                    return BulkOperation.of(bulk -> bulk
                                            .update(up -> up
                                                    .index(indexName)
                                                    .id(bean.getKey())
                                                    .routing(bean.getKey())
                                                    .action(upAction -> upAction
                                                            .docAsUpsert(true)
                                                            .doc(doc)
                                                    )
                                            )
                                    );
                                }
                        )
                        .collect(Collectors.toList());

                bulkResponse = client.bulk(bulkBd -> bulkBd
                        .index(indexName)
                        .operations(operations)
                );

                if (bulkResponse == null || bulkResponse.errors()) {
                    log.error("error bulk es doc size:({}) and error:{}", beans.size(), bulkResponse);
                } else {
                    log.info("success branch update index :{} total:{}", indexName, beans.size());
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

Agg

package elasticsearch.util;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import lombok.extern.slf4j.Slf4j;
import lombok.var;

import java.io.IOException;
import java.util.*;

/**
 * @author zxc
 * @date 2022/9/2/002 15:14
 */
@Slf4j
public class EsAggUtils {

    /**
     * terms agg
     *
     * @param field field
     * @param topN  topN
     * @return Aggregation
     */
    public static Aggregation termsAgg(String field, Integer topN) {
        return Aggregation.of(agg -> agg
                .terms(ts -> ts
                        .field(field)
                        .size(topN)
                )
        );
    }

    /**
     * sum agg
     *
     * @param field field
     * @return Aggregation
     */
    public static Aggregation sumAgg(String field) {
        return Aggregation.of(agg -> agg
                .sum(sum -> sum
                        .field(field)
                )

        );
    }

    /**
     * term嵌套2层
     *
     * @param field1 field1
     * @param topN1  topN1
     * @param field2 field2
     * @param topN2  topN2
     * @return Aggregation
     */
    public static Aggregation termsTermsAgg(String field1, Integer topN1, String field2, Integer topN2) {
        return Aggregation.of(
                agg -> agg.terms(
                                ts -> ts.field(field1)
                                        .size(topN1)
                        )
                        .aggregations(field2, termsAgg(field2, topN2))
        );
    }

    /**
     * terms agg
     *
     * @param field    field
     * @param sumField sumField
     * @param topN     topN
     * @return Aggregation
     */
    public static Aggregation termsSumAgg(String field, String sumField, Integer topN) {
        return Aggregation.of(agg -> agg
                .terms(ts -> ts
                        .field(field)
                        .size(topN)
                )
                .aggregations(sumField, sumAgg(sumField))
        );
    }

    /**
     * term 嵌套三层
     *
     * @param field1
     * @param topN1
     * @param field2
     * @param topN2
     * @param field3
     * @param topN3
     * @return
     */
    public static Aggregation terms3Agg(String field1, Integer topN1, String field2, Integer topN2, String field3, Integer topN3) {
        return Aggregation.of(
                agg -> agg.terms(
                        ts -> ts.field(field1)
                                .size(topN1)
                ).aggregations(field2, Aggregation.of(
                                agg1 -> agg1.terms(
                                        ts1 -> ts1.field(field2).size(topN2)
                                ).aggregations(field3, Aggregation.of(
                                                agg2 -> agg2.terms(
                                                        ts2 -> ts2.field(field3).size(topN3)
                                                )

                                        )
                                )
                        )
                )
        );
    }

    /**
     * 简单的terms agg
     *
     * @param client     client
     * @param query      query
     * @param termsField termsField
     * @param topN       topN
     * @param indexes    索引
     * @return HashMap<terms, count>
     */
    public static HashMap<String, Long> termsAgg(ElasticsearchClient client, Query query, String termsField, Integer topN, List<String> indexes) {
        try {
            topN = ObjectUtil.defaultIfNull(topN, 10000);
            HashMap<String, Long> aggMap = new LinkedHashMap<>((int) (topN / 0.75) + 1);
            //term agg
            Aggregation aggregation = termsAgg(termsField, topN);
            SearchRequest searchRequest = new SearchRequest.Builder()
                    .index(indexes)
                    .query(query)
                    .aggregations(termsField, aggregation)
                    .build();

            //search
            SearchResponse<Object> response = client.search(searchRequest, Object.class);

            var termsBuckets = response.aggregations()
                    .get(termsField)
                    .sterms()
                    .buckets()
                    .array();

            //result
            for (var termsBucket : termsBuckets) {
                aggMap.put(termsBucket.key(), termsBucket.docCount());
            }

            return aggMap;
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }


    /**
     * 简单的terms agg
     *
     * @param client     client
     * @param query      query
     * @param termsField termsField
     * @param topN       topN
     * @param indexes    索引
     * @return HashMap<terms, count>
     */
    public static HashMap<String, Long> termsAgg(ElasticsearchClient client, Query query, String termsField, Integer topN, String... indexes) {
        return termsAgg(client, query, termsField, topN, Arrays.asList(indexes));
    }


    public static HashMap<String, HashMap<String, Long>> termsTermsAgg(ElasticsearchClient client, Query query, String termsField1, Integer topN1, String termsField2, Integer topN2, String... indexes) {
        try {
            HashMap<String, HashMap<String, Long>> aggMap = new LinkedHashMap<>((int) (topN1 / 0.75) + 1);
            //term term agg
            Aggregation aggregation = termsTermsAgg(termsField1, topN1, termsField2, topN2);

            SearchRequest searchRequest = new SearchRequest.Builder()
                    .index(Arrays.asList(indexes))
                    .query(query)
                    .aggregations(termsField1, aggregation)
                    .build();

            //search
            SearchResponse<Object> response = client.search(searchRequest, Object.class);

            //第一层terms
            Optional.ofNullable(response.aggregations())
                    .map(aggregations -> aggregations.get(termsField1))
                    .map(Aggregate::sterms)
                    .map(StringTermsAggregate::buckets)
                    .map(Buckets::array)
                    .ifPresent(termsBuckets -> {
                        //result
                        for (var termsBucket : termsBuckets) {
                            HashMap<String, Long> subMap = new HashMap<>((int) (topN2 / 0.75) + 1);

                            //第二次terms
                            Optional.ofNullable(termsBucket.aggregations())
                                    .map(aggregations -> aggregations.get(termsField2))
                                    .map(Aggregate::sterms)
                                    .map(StringTermsAggregate::buckets)
                                    .map(Buckets::array)
                                    .ifPresent(terms2Buckets -> {
                                        for (StringTermsBucket bucket2 : terms2Buckets) {
                                            subMap.put(bucket2.key(), bucket2.docCount());
                                        }
                                    });
                            aggMap.put(termsBucket.key(), subMap);
                        }
                    });
            return aggMap;
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }

    public static LinkedHashMap<String, Object> wrapNAgg(Map<String, Aggregate> aggregateMap) {
        LinkedHashMap<String, Object> resultMap = new LinkedHashMap<>();
        for (Map.Entry<String, Aggregate> entry : aggregateMap.entrySet()) {
            String key = entry.getKey();
            Aggregate aggregate = entry.getValue();
            LinkedHashMap<String, Object> singleResults = new LinkedHashMap<>();
            if (aggregate.isSterms()) {
                for (StringTermsBucket bucket : aggregate.sterms().buckets().array()) {
                    if (CollUtil.isEmpty(bucket.aggregations())) {
                        singleResults.put(bucket.key(), bucket.docCount());
                    } else {
                        singleResults.put(bucket.key(), wrapNAgg(bucket.aggregations()));
                    }
                }
            } else if (aggregate.isLterms()) {
                for (LongTermsBucket bucket : aggregate.lterms().buckets().array()) {
                    if (CollUtil.isEmpty(bucket.aggregations())) {
                        singleResults.put(bucket.key(), bucket.docCount());
                    } else {
                        singleResults.put(bucket.key(), wrapNAgg(bucket.aggregations()));
                    }
                }
            } else if (aggregate.isSum()) {
                singleResults.put(key, aggregate.sum().value());
            } else if (aggregate.isDateHistogram()) {
                List<DateHistogramBucket> dateHistogramBuckets = aggregate.dateHistogram().buckets().array();
                LinkedHashMap<String, Long> dateMap = new LinkedHashMap<>((int) (dateHistogramBuckets.size() / 0.75) + 1);
                dateHistogramBuckets.forEach(bucket -> dateMap.put(bucket.keyAsString(), bucket.docCount()));
                resultMap.put(key, dateMap);
                continue;
            }
            resultMap.put(key, singleResults);
        }
        return resultMap;
    }

    /**
     * termsSumAgg
     *
     * @param client      client
     * @param query       query
     * @param aggField    agg一层字段
     * @param aggSumField agg sum 二层字段
     * @param topN        topN
     * @param indexList   indexList
     * @return Map<String, Double> k:agg field v:sum
     */
    public static Map<String, Double> termsSumAgg(ElasticsearchClient client, Query query, String aggField, String aggSumField, Integer topN, List<String> indexList) {
        Map<String, Double> aggMap = new LinkedHashMap<>((int) (topN / 0.75) + 1);
        try {
            //term agg
            Aggregation aggregation = termsSumAgg(aggField, aggSumField, topN);

            SearchRequest searchRequest = new SearchRequest.Builder()
                    .index(indexList)
                    .query(query)
                    .aggregations(aggField, aggregation)
                    .build();

            //search
            SearchResponse<Object> response = client.search(searchRequest, Object.class);

            //第一层terms
            Optional.ofNullable(response.aggregations())
                    .map(aggregations -> aggregations.get(aggField))
                    .map(Aggregate::sterms)
                    .map(StringTermsAggregate::buckets)
                    .map(Buckets::array)
                    .ifPresent(termsBuckets -> {
                        //result
                        for (var termsBucket : termsBuckets) {
                            //sum agg
                            Optional.ofNullable(termsBucket.aggregations())
                                    .map(aggregations -> aggregations.get(aggSumField))
                                    .map(Aggregate::sum)
                                    .map(SingleMetricAggregateBase::value)
                                    .ifPresent(value -> aggMap.put(termsBucket.key(), value));

                        }
                    });
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return aggMap;
    }
}

Other

ESField

package elasticsearch.annotation;

import elasticsearch.util.ClazzUtil;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;

/**
 * es字段,实体字段只有标明了该字段,才能够写入到es中
 *
 * @author zhangxuecheng4441
 * @date 2022/8/12/012 13:42
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface ESField {


    @Slf4j
    class Method {
        /**
         * bean 转 map 受ESField字段过滤
         *
         * @param obj obj
         * @param <T> obj T
         * @return obj map
         */
        public static <T> Map<String, Object> getMap(T obj) {
            ESField esAnno = obj.getClass().getAnnotation(ESField.class);
            //类上包含注解 所有字段写入es
            final boolean matchAllField = esAnno != null;
            val fields = ClazzUtil.getClazzAllField(obj.getClass());

            String errorFieldName = "serialVersionUID";
            return fields.stream()
                    .peek(field -> field.setAccessible(true))
                    //类或者字段上包含注解 则写入es
                    .filter(field -> matchAllField || field.getAnnotation(ESField.class) != null)
                    .filter(field -> !errorFieldName.equals(field.getName()))
                    .collect(HashMap::new,
                            (map, field) -> {
                                try {
                                    if (field.get(obj) != null) {
                                        map.put(field.getName(), field.get(obj));
                                    }
                                } catch (IllegalAccessException e) {
                                    e.printStackTrace();
                                }
                            },
                            HashMap::putAll);
        }
    }
}

EsDoc

package elasticsearch.pojo;

/**
 * 写入对象规范
 *
 * @author zhangxuecheng4441
 * @date 2023/2/27/027 10:06
 */
public interface EsDoc {
    /**
     * 获取id
     *
     * @return id
     */
    String getDocId();

    /**
     * 获取routing
     *
     * @return routing
     */
    String getRouting();
}

EsCommonUtils

package elasticsearch.util;

import cn.hutool.core.collection.ListUtil;
import co.elastic.clients.elasticsearch._types.FieldValue;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author zxc
 * @date 2022/5/26/026 16:03
 */
public class EsCommonUtils {
    /**
     * string  转 List<FieldValue>
     *
     * @param fields strings
     * @return List<FieldValue>
     */
    public static List<FieldValue> str2Field(String... fields) {
        return strList2Field(ListUtil.toList(fields));
    }

    /**
     * string list 转 List<FieldValue>
     *
     * @param fields string list
     * @return List<FieldValue>
     */
    public static List<FieldValue> strList2Field(Collection<String> fields) {
        return fields.stream()
                .map(FieldValue::of)
                .collect(Collectors.toList());
    }
}

ClazzUtil

package elasticsearch.util;

import cn.hutool.core.util.ClassUtil;
import lombok.val;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
 * @author zhangxuecheng4441
 * @date 2023/2/16/016 11:25
 */
public class ClazzUtil extends ClassUtil {

    /**
     * 获取所有父类
     * @param clazz clazz
     * @return Set<Class<?>>
     */
    public static Set<Class<?>> getSuperClazz(Class<?> clazz) {
        val clazzList = new HashSet<Class<?>>();

        //递归找父类
        if (clazz.getSuperclass() != null) {
            clazzList.add(clazz);
            clazzList.addAll(getSuperClazz(clazz.getSuperclass()));
        }else {
            clazzList.add(clazz);
        }
        return clazzList;
    }

    /**
     * 获取所有字段
     *
     * @param clazz clazz
     * @return  Set<Field>
     */
    public static Set<Field> getClazzAllField(Class<?> clazz) {
        val superClazz = getSuperClazz(clazz);
        val fields = new HashSet<Field>();

        for (Class<?> aClass : superClazz) {
            fields.addAll(Arrays.asList(ClassUtil.getDeclaredFields(aClass)));
        }

        return fields;
    }
}

github

https://github.com/opop32165455/elasticsearch-java-7.x

后言

欢迎提意见。

相关文章

网友评论

      本文标题:Elasticseach-java 封装API

      本文链接:https://www.haomeiwen.com/subject/kiiqjltx.html