前言
ES7
版本推出了lambda形式的全新API
ES
在后面的新版本将会不再更新HighLevelClient
当然最主要的还是网上搜不到样例代码 比较生气 官网代码里的单元测试写得乱的一笔 干脆自己封装
依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<target.java.version>1.8</target.java.version>
<elasticsearch.java.version>7.17.9</elasticsearch.java.version>
<jackson.version>2.12.3</jackson.version>
<lombok.version>1.18.20</lombok.version>
<hutool.version>5.8.12</hutool.version>
<slf4j.version>1.7.36</slf4j.version>
</properties>
<dependencies>
<!-- es lambda 全新api-->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.java.version}</version>
</dependency>
<!-- es 官网必备的依赖 https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/installation.html -->
<!-- todo 非常以及特别的容易出现依赖冲突 建议单独封装一个模块 打成shaded依赖-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- 需要安装lombok插件-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- 工具包-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
</dependencies>
Code
Client
package elasticsearch.util;
import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* @author zhangxuecheng4441
* @date 2022/4/7/007 10:48
*/
@Slf4j
public class EsClientUtils {
/**
* restClient
*/
private static final RestClient REST_CLIENT = RestClient.builder(getDefaultEsHosts().toArray(new HttpHost[]{})).build();
/**
* Create the transport with a Jackson mapper
*/
private static final ElasticsearchTransport TRANSPORT = new RestClientTransport(
REST_CLIENT, new JacksonJsonpMapper());
/**
* And create the API client
*/
private static final ElasticsearchClient CLIENT = new ElasticsearchClient(TRANSPORT);
public static ElasticsearchClient getClient() {
return CLIENT;
}
/**
* 判断索引是否存在
*
* @param index index name
* @return is exists
*/
public static Boolean indexIsExist(String index) {
try {
BooleanResponse exists = EsClientUtils.getClient().indices().exists(iq -> iq.index(index));
return exists.value();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return false;
}
/**
* 创建索引
*
* @param index index name
*/
public static void createIndex(String index) {
try {
EsClientUtils.getClient().indices().create(cir -> cir.index(index));
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
/**
* http 地址
*
* @param hosts hosts
* @return List<HttpHost>
*/
public static List<HttpHost> getEsHosts(String hosts) {
List<HttpHost> httpHosts = new ArrayList<>();
String[] split = hosts.split(StrUtil.COMMA);
for (String s : split) {
httpHosts.add(HttpHost.create(s));
}
return httpHosts;
}
public static List<HttpHost> getDefaultEsHosts() {
String esHostsConfKey = "es.hosts";
return getEsHosts(new Props("application.properties", Charset.defaultCharset()).getStr(esHostsConfKey, "es1:9200"));
}
}
Search
package elasticsearch.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.StrUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static cn.hutool.core.text.StrPool.COMMA;
/**
* @author zxc
* @date 2022/6/22/022 11:08
*/
@Slf4j
public class EsSearchUtil {
/**
* index 切割
*
* @param indexStr aaa,bbb
* @return [aaa, bbb]
*/
public static String[] getSearchIndex(String indexStr) {
return StrUtil.splitToArray(indexStr, CharUtil.COMMA);
}
/**
* 构造一个简单terms函数
*
* @param field 字段
* @param values values
* @return Query
*/
public static Query termsQuery(String field, Collection<String> values) {
int batch = 1000;
ArrayList<Query> list = new ArrayList<>();
//1000个数据切割一次
for (List<String> value1000 : CollUtil.split(values, batch)) {
Query query = Query.of(qb -> qb
.terms(termsBuilder -> termsBuilder
.field(field)
.terms(tf -> tf.value(EsCommonUtils.strList2Field(value1000)))
)
);
list.add(query);
}
//拼成一个查询语句
return Query.of(qb -> qb
.bool(bool -> bool
.should(list)
)
);
}
/**
* 构造一个简单terms函数
*
* @param field 字段
* @param values values
* @return Query
*/
public static Query termsQuery(String field, String... values) {
return Query.of(qb -> qb
.terms(termsBuilder -> termsBuilder
.field(field)
.terms(tf -> tf.value(EsCommonUtils.str2Field(values)))
)
);
}
/**
* 构造一个简单match函数
*
* @param field 字段
* @param value value es7支持match:"aaa,bbb,ccc"
* @return Query
*/
public static Query matchQuery(String field, String value) {
return Query.of(qb -> qb
.match(match -> match
.field(field)
.query(value)
)
);
}
/**
* match all query
*
* @return match all query
*/
public static Query matchAll() {
return Query.of(qb -> qb
.matchAll(match -> match
)
);
}
/**
* exists query
*
* @param field field
* @return query
*/
public static Query existsQuery(String field) {
return Query.of(q -> q.exists(
e -> e.field(field)
));
}
/**
* es dsl查询语句 生成query
*
* @param dsl es dsl
* @return query
*/
public static Query dslParseQuery(String dsl) {
return Query.of(q -> q.withJson(new StringReader(dsl)));
}
/**
* es dsl查询语句 生成query
*
* @param query query
* @return dsl str
*/
public static String query2Dsl(Query query) {
return StrUtil.removePrefix(query.toString(), "Query:");
}
/**
* scroll查询出数据,然后根据需要处理
*
* @param client ElasticsearchClient
* @param query Query.of()
* @param index index
* @param dataConsumer 数据消费处理函数
* @throws IOException IOException
*/
public static void scrollHandle(ElasticsearchClient client, Query query, List<String> index, Consumer<List<Object>> dataConsumer) throws IOException {
long total = 0;
SearchResponse<Object> response = client.search(
SearchRequest.of(sear -> sear
.query(query)
.scroll(Time.of(ts -> ts.time("2m")))
.index(index)
.size(5000)
)
//此处使用es默认的反序列化 容易出现问题
, Object.class
);
log.info("search from index:{},dsl:\r\n{}", index, query.toString());
if (CollUtil.isEmpty(response.hits().hits())) {
log.warn("can not find result from this search");
return;
}
List<Object> objects = response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
dataConsumer.accept(objects);
log.info("search size:{}", total = total + objects.size());
String scrollId = response.scrollId();
while (scrollId != null) {
final String searchScroll = scrollId;
ScrollResponse<Object> scrollResponse = client.scroll(
ScrollRequest.of(scroll -> scroll
.scrollId(searchScroll)
.scroll(time -> time.time("2m"))
), Object.class
);
List<Hit<Object>> hitList = scrollResponse.hits().hits();
if (hitList.isEmpty()) {
break;
} else {
scrollId = scrollResponse.scrollId();
}
List<Object> scrollObjects = hitList.stream().map(Hit::source).collect(Collectors.toList());
dataConsumer.accept(scrollObjects);
log.info("scroll search size:{}", total = total + scrollObjects.size());
}
log.info("finish scroll search handle total :{}", total);
}
/**
* scroll查询出数据,然后根据需要处理
*
* @param client ElasticsearchClient
* @param query Query.of()
* @param index index
* @param dataConsumer 数据消费处理函数
* @throws IOException IOException
*/
public static void scrollHandle(ElasticsearchClient client, Query query, String index, Consumer<List<Object>> dataConsumer) throws IOException {
val indexList = getSearchIndexList(index);
scrollHandle(client, query, indexList, dataConsumer);
}
/**
* index -> list<index>
*
* @param index index
* @return list index
*/
public static List<String> getSearchIndexList(String index) {
val indexList = new ArrayList<String>();
if (index.contains(COMMA)) {
indexList.addAll(StrUtil.split(index, COMMA));
} else {
indexList.add(index);
}
return indexList;
}
/**
* 查询count
*
* @param client client
* @param query query
* @param index index
* @return count
*/
public static Long getCount(ElasticsearchClient client, Query query, List<String> index) {
try {
CountResponse countResponse = client.count(CountRequest.of(count -> count
.index(index)
.query(query)
)
);
return countResponse.count();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}
Write
package elasticsearch.util;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.map.MapUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import elasticsearch.annotation.ESField;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author zhangxuecheng4441
* @date 2022/10/26/026 10:14
*/
@Slf4j
public class EsWriteUtils {
/**
* 写入对象规范
*/
public interface EsDoc {
/**
* 获取id
*
* @return id
*/
String getDocId();
/**
* 获取routing
*
* @return routing
*/
String getRouting();
}
/**
* 数据写入
*
* @param client client
* @param indexName index name
* @param bean doc 注意写入数据受@ESField过滤
*/
public static void write(ElasticsearchClient client, String indexName, EsDoc bean) {
BulkResponse bulkResponse;
try {
bulkResponse = client.bulk(bulkBd -> bulkBd
.operations(bulkOpBd -> bulkOpBd
.update(bulkOp -> bulkOp
.index(indexName)
.id(bean.getDocId())
.routing(bean.getRouting())
.action(upAction -> upAction
.docAsUpsert(true)
.doc(ESField.Method.getMap(bean))
)
)
)
);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (bulkResponse == null || bulkResponse.errors()) {
log.error("error bulk es doc({}) and error:{}", bean.getDocId(), bulkResponse);
} else {
log.info("write to {} doc success id:{}", indexName, bean.getDocId());
}
}
/**
* 数据批量写入
*
* @param client client
* @param indexName indexName
* @param esDocs List<? extends EsDoc> 注意写入数据受@ESField过滤
*/
public static void batchWrite(ElasticsearchClient client, String indexName, List<? extends EsDoc> esDocs) {
BulkResponse bulkResponse;
try {
final int batch = 1000;
for (List<? extends EsDoc> beans : ListUtil.partition(esDocs, batch)) {
List<BulkOperation> operations = beans.stream()
.map(bean -> BulkOperation.of(bulk -> bulk
.update(up -> up
.index(indexName)
.id(bean.getDocId())
.routing(bean.getRouting())
.action(upAction -> upAction
.docAsUpsert(true)
.doc(ESField.Method.getMap(bean))
)
)
)
)
.collect(Collectors.toList());
bulkResponse = client.bulk(bulkBd -> bulkBd
.index(indexName)
.operations(operations)
);
//client.indices().flush()
if (bulkResponse == null || bulkResponse.errors()) {
log.error("error bulk es doc size:({}) and error:{}", beans.size(), bulkResponse);
} else {
log.info("success branch update index :{} total:{}", indexName, beans.size());
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 数据批量写入
*
* @param client client
* @param indexName indexName
* @param esDocs List<Pair<String, Map<String,Object>>>
*/
public static void batchMapWrite(ElasticsearchClient client, String indexName, List<Pair<String, Map<String, Object>>> esDocs) {
BulkResponse bulkResponse;
try {
final int batch = 1000;
for (List<Pair<String, Map<String, Object>>> beans : ListUtil.partition(esDocs, batch)) {
List<BulkOperation> operations = beans.stream()
.map(bean -> {
val docMap = bean.getValue();
val doc = MapUtil.removeNullValue(docMap);
return BulkOperation.of(bulk -> bulk
.update(up -> up
.index(indexName)
.id(bean.getKey())
.routing(bean.getKey())
.action(upAction -> upAction
.docAsUpsert(true)
.doc(doc)
)
)
);
}
)
.collect(Collectors.toList());
bulkResponse = client.bulk(bulkBd -> bulkBd
.index(indexName)
.operations(operations)
);
if (bulkResponse == null || bulkResponse.errors()) {
log.error("error bulk es doc size:({}) and error:{}", beans.size(), bulkResponse);
} else {
log.info("success branch update index :{} total:{}", indexName, beans.size());
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Agg
package elasticsearch.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import java.io.IOException;
import java.util.*;
/**
* @author zxc
* @date 2022/9/2/002 15:14
*/
@Slf4j
public class EsAggUtils {
/**
* terms agg
*
* @param field field
* @param topN topN
* @return Aggregation
*/
public static Aggregation termsAgg(String field, Integer topN) {
return Aggregation.of(agg -> agg
.terms(ts -> ts
.field(field)
.size(topN)
)
);
}
/**
* sum agg
*
* @param field field
* @return Aggregation
*/
public static Aggregation sumAgg(String field) {
return Aggregation.of(agg -> agg
.sum(sum -> sum
.field(field)
)
);
}
/**
* term嵌套2层
*
* @param field1 field1
* @param topN1 topN1
* @param field2 field2
* @param topN2 topN2
* @return Aggregation
*/
public static Aggregation termsTermsAgg(String field1, Integer topN1, String field2, Integer topN2) {
return Aggregation.of(
agg -> agg.terms(
ts -> ts.field(field1)
.size(topN1)
)
.aggregations(field2, termsAgg(field2, topN2))
);
}
/**
* terms agg
*
* @param field field
* @param sumField sumField
* @param topN topN
* @return Aggregation
*/
public static Aggregation termsSumAgg(String field, String sumField, Integer topN) {
return Aggregation.of(agg -> agg
.terms(ts -> ts
.field(field)
.size(topN)
)
.aggregations(sumField, sumAgg(sumField))
);
}
/**
* term 嵌套三层
*
* @param field1
* @param topN1
* @param field2
* @param topN2
* @param field3
* @param topN3
* @return
*/
public static Aggregation terms3Agg(String field1, Integer topN1, String field2, Integer topN2, String field3, Integer topN3) {
return Aggregation.of(
agg -> agg.terms(
ts -> ts.field(field1)
.size(topN1)
).aggregations(field2, Aggregation.of(
agg1 -> agg1.terms(
ts1 -> ts1.field(field2).size(topN2)
).aggregations(field3, Aggregation.of(
agg2 -> agg2.terms(
ts2 -> ts2.field(field3).size(topN3)
)
)
)
)
)
);
}
/**
* 简单的terms agg
*
* @param client client
* @param query query
* @param termsField termsField
* @param topN topN
* @param indexes 索引
* @return HashMap<terms, count>
*/
public static HashMap<String, Long> termsAgg(ElasticsearchClient client, Query query, String termsField, Integer topN, List<String> indexes) {
try {
topN = ObjectUtil.defaultIfNull(topN, 10000);
HashMap<String, Long> aggMap = new LinkedHashMap<>((int) (topN / 0.75) + 1);
//term agg
Aggregation aggregation = termsAgg(termsField, topN);
SearchRequest searchRequest = new SearchRequest.Builder()
.index(indexes)
.query(query)
.aggregations(termsField, aggregation)
.build();
//search
SearchResponse<Object> response = client.search(searchRequest, Object.class);
var termsBuckets = response.aggregations()
.get(termsField)
.sterms()
.buckets()
.array();
//result
for (var termsBucket : termsBuckets) {
aggMap.put(termsBucket.key(), termsBucket.docCount());
}
return aggMap;
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return null;
}
/**
* 简单的terms agg
*
* @param client client
* @param query query
* @param termsField termsField
* @param topN topN
* @param indexes 索引
* @return HashMap<terms, count>
*/
public static HashMap<String, Long> termsAgg(ElasticsearchClient client, Query query, String termsField, Integer topN, String... indexes) {
return termsAgg(client, query, termsField, topN, Arrays.asList(indexes));
}
public static HashMap<String, HashMap<String, Long>> termsTermsAgg(ElasticsearchClient client, Query query, String termsField1, Integer topN1, String termsField2, Integer topN2, String... indexes) {
try {
HashMap<String, HashMap<String, Long>> aggMap = new LinkedHashMap<>((int) (topN1 / 0.75) + 1);
//term term agg
Aggregation aggregation = termsTermsAgg(termsField1, topN1, termsField2, topN2);
SearchRequest searchRequest = new SearchRequest.Builder()
.index(Arrays.asList(indexes))
.query(query)
.aggregations(termsField1, aggregation)
.build();
//search
SearchResponse<Object> response = client.search(searchRequest, Object.class);
//第一层terms
Optional.ofNullable(response.aggregations())
.map(aggregations -> aggregations.get(termsField1))
.map(Aggregate::sterms)
.map(StringTermsAggregate::buckets)
.map(Buckets::array)
.ifPresent(termsBuckets -> {
//result
for (var termsBucket : termsBuckets) {
HashMap<String, Long> subMap = new HashMap<>((int) (topN2 / 0.75) + 1);
//第二次terms
Optional.ofNullable(termsBucket.aggregations())
.map(aggregations -> aggregations.get(termsField2))
.map(Aggregate::sterms)
.map(StringTermsAggregate::buckets)
.map(Buckets::array)
.ifPresent(terms2Buckets -> {
for (StringTermsBucket bucket2 : terms2Buckets) {
subMap.put(bucket2.key(), bucket2.docCount());
}
});
aggMap.put(termsBucket.key(), subMap);
}
});
return aggMap;
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return null;
}
public static LinkedHashMap<String, Object> wrapNAgg(Map<String, Aggregate> aggregateMap) {
LinkedHashMap<String, Object> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, Aggregate> entry : aggregateMap.entrySet()) {
String key = entry.getKey();
Aggregate aggregate = entry.getValue();
LinkedHashMap<String, Object> singleResults = new LinkedHashMap<>();
if (aggregate.isSterms()) {
for (StringTermsBucket bucket : aggregate.sterms().buckets().array()) {
if (CollUtil.isEmpty(bucket.aggregations())) {
singleResults.put(bucket.key(), bucket.docCount());
} else {
singleResults.put(bucket.key(), wrapNAgg(bucket.aggregations()));
}
}
} else if (aggregate.isLterms()) {
for (LongTermsBucket bucket : aggregate.lterms().buckets().array()) {
if (CollUtil.isEmpty(bucket.aggregations())) {
singleResults.put(bucket.key(), bucket.docCount());
} else {
singleResults.put(bucket.key(), wrapNAgg(bucket.aggregations()));
}
}
} else if (aggregate.isSum()) {
singleResults.put(key, aggregate.sum().value());
} else if (aggregate.isDateHistogram()) {
List<DateHistogramBucket> dateHistogramBuckets = aggregate.dateHistogram().buckets().array();
LinkedHashMap<String, Long> dateMap = new LinkedHashMap<>((int) (dateHistogramBuckets.size() / 0.75) + 1);
dateHistogramBuckets.forEach(bucket -> dateMap.put(bucket.keyAsString(), bucket.docCount()));
resultMap.put(key, dateMap);
continue;
}
resultMap.put(key, singleResults);
}
return resultMap;
}
/**
* termsSumAgg
*
* @param client client
* @param query query
* @param aggField agg一层字段
* @param aggSumField agg sum 二层字段
* @param topN topN
* @param indexList indexList
* @return Map<String, Double> k:agg field v:sum
*/
public static Map<String, Double> termsSumAgg(ElasticsearchClient client, Query query, String aggField, String aggSumField, Integer topN, List<String> indexList) {
Map<String, Double> aggMap = new LinkedHashMap<>((int) (topN / 0.75) + 1);
try {
//term agg
Aggregation aggregation = termsSumAgg(aggField, aggSumField, topN);
SearchRequest searchRequest = new SearchRequest.Builder()
.index(indexList)
.query(query)
.aggregations(aggField, aggregation)
.build();
//search
SearchResponse<Object> response = client.search(searchRequest, Object.class);
//第一层terms
Optional.ofNullable(response.aggregations())
.map(aggregations -> aggregations.get(aggField))
.map(Aggregate::sterms)
.map(StringTermsAggregate::buckets)
.map(Buckets::array)
.ifPresent(termsBuckets -> {
//result
for (var termsBucket : termsBuckets) {
//sum agg
Optional.ofNullable(termsBucket.aggregations())
.map(aggregations -> aggregations.get(aggSumField))
.map(Aggregate::sum)
.map(SingleMetricAggregateBase::value)
.ifPresent(value -> aggMap.put(termsBucket.key(), value));
}
});
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return aggMap;
}
}
Other
ESField
package elasticsearch.annotation;
import elasticsearch.util.ClazzUtil;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
/**
* es字段,实体字段只有标明了该字段,才能够写入到es中
*
* @author zhangxuecheng4441
* @date 2022/8/12/012 13:42
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface ESField {
@Slf4j
class Method {
/**
* bean 转 map 受ESField字段过滤
*
* @param obj obj
* @param <T> obj T
* @return obj map
*/
public static <T> Map<String, Object> getMap(T obj) {
ESField esAnno = obj.getClass().getAnnotation(ESField.class);
//类上包含注解 所有字段写入es
final boolean matchAllField = esAnno != null;
val fields = ClazzUtil.getClazzAllField(obj.getClass());
String errorFieldName = "serialVersionUID";
return fields.stream()
.peek(field -> field.setAccessible(true))
//类或者字段上包含注解 则写入es
.filter(field -> matchAllField || field.getAnnotation(ESField.class) != null)
.filter(field -> !errorFieldName.equals(field.getName()))
.collect(HashMap::new,
(map, field) -> {
try {
if (field.get(obj) != null) {
map.put(field.getName(), field.get(obj));
}
} catch (IllegalAccessException e) {
e.printStackTrace();
}
},
HashMap::putAll);
}
}
}
EsDoc
package elasticsearch.pojo;
/**
* 写入对象规范
*
* @author zhangxuecheng4441
* @date 2023/2/27/027 10:06
*/
public interface EsDoc {
/**
* 获取id
*
* @return id
*/
String getDocId();
/**
* 获取routing
*
* @return routing
*/
String getRouting();
}
EsCommonUtils
package elasticsearch.util;
import cn.hutool.core.collection.ListUtil;
import co.elastic.clients.elasticsearch._types.FieldValue;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author zxc
* @date 2022/5/26/026 16:03
*/
public class EsCommonUtils {
/**
* string 转 List<FieldValue>
*
* @param fields strings
* @return List<FieldValue>
*/
public static List<FieldValue> str2Field(String... fields) {
return strList2Field(ListUtil.toList(fields));
}
/**
* string list 转 List<FieldValue>
*
* @param fields string list
* @return List<FieldValue>
*/
public static List<FieldValue> strList2Field(Collection<String> fields) {
return fields.stream()
.map(FieldValue::of)
.collect(Collectors.toList());
}
}
ClazzUtil
package elasticsearch.util;
import cn.hutool.core.util.ClassUtil;
import lombok.val;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* @author zhangxuecheng4441
* @date 2023/2/16/016 11:25
*/
public class ClazzUtil extends ClassUtil {
/**
* 获取所有父类
* @param clazz clazz
* @return Set<Class<?>>
*/
public static Set<Class<?>> getSuperClazz(Class<?> clazz) {
val clazzList = new HashSet<Class<?>>();
//递归找父类
if (clazz.getSuperclass() != null) {
clazzList.add(clazz);
clazzList.addAll(getSuperClazz(clazz.getSuperclass()));
}else {
clazzList.add(clazz);
}
return clazzList;
}
/**
* 获取所有字段
*
* @param clazz clazz
* @return Set<Field>
*/
public static Set<Field> getClazzAllField(Class<?> clazz) {
val superClazz = getSuperClazz(clazz);
val fields = new HashSet<Field>();
for (Class<?> aClass : superClazz) {
fields.addAll(Arrays.asList(ClassUtil.getDeclaredFields(aClass)));
}
return fields;
}
}
github
https://github.com/opop32165455/elasticsearch-java-7.x
后言
欢迎提意见。
网友评论