0 - 前言
Elasticsearch Java API有四类client连接方式:
- TransportClient;
- RestClient;
- Jest;
- Spring Data Elasticsearch;
其中TransportClient和RestClient是Elasticsearch原生的api。TransportClient可以支持2.x,5.x和6.x版本,TransportClient将会在Elasticsearch 7.0弃用并在8.0中完成删除,替而代之,我们使用Java High Level REST Client,它使用HTTP请求而不是Java序列化请求。
Jest是Java社区开发的,是Elasticsearch的Java Http Rest客户端;Spring Data Elasticsearch是spring集成的Elasticsearch开发包。
建议:TransportClient将会在后面的版本中弃用,因此不推荐后续使用;而Jest由于是社区维护,所以更新有一定延迟,目前最新版对接ES 6.3.1,近一个月只有四个issue,说明整体活跃度较低,因此也不推荐使用;Spring Data Elasticsearch主要是与Spring生态对接,可以在web系统中整合到Spring中使用。目前比较推荐使用官方的高阶、低阶Rest Client,官方维护,比较值得信赖。本文主要介绍RestClient,其他的只做简单概述。
1 - TransportClient
这里考虑到后面版本将弃用TransportClinet,这里只简单介绍TransportClient的创建、以及一些基本增删改查操作。
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.0</version>
</dependency>
/**
* transportClient将会在7.0版本上过时,并在8.0版本上移除掉,建议使用Java High Level REST Client
*/
public class TransportClientUtils {
private TransportClient client = null;
/**
* 这里使用ES单例模式创建TransportClient
*/
public TransportClientUtils() {
if (client == null){
synchronized (TransportClientUtils.class){
if (client == null){
client = getClient();
}
}
}
}
public TransportClient getClient(){
TransportClient client = null;
try {
Settings settings = Settings.builder()
.put("client.transport.sniff", true)
.put("cluster.name", "bigdata").build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(new InetSocketAddress("192.168.187.201", 9300)));
} catch (Exception e) {
e.printStackTrace();
}
return client;
}
public viod test(){
//增,插入记录
IndexResponse response = client.prepareIndex("twitter", "_doc")
.setSource(json, XContentType.JSON)
.get();
//根据Id查询
GetResponse response = client.prepareGet("twitter", "_doc", 1).get();
//根据Id删除
DeleteResponse response = client.prepareDelete("twitter", "_doc", 1).get();
//根据Id更新对应的字段
UpdateRequest updateRequest1 = new UpdateRequest("twitter", "_doc", "NpEWCGcBi36MQkKOSdf3")
.doc(jsonBuilder()
.startObject()
.field("user", "tom")
.endObject()
);
client.update(updateRequest1).get();
//另外还有批处理API、search负责查询API、Aggregate聚合API...
}
}
具体的TransportClient的其他API应用可以参考Elasticsearch的TransportClient API doc
2 - Rest Client
后面的elasticSearch版本将主要使用Rest Client操作数据。Rest Client分为Java Low REST Client和Java High Level REST Client。
maven的依赖包:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.0</version>
</dependency>
RestHighLevelClient与RestClient的创建:
public class RestClientUtils {
/**
* 高阶Rest Client
*/
private RestHighLevelClient client = null;
/**
* 低阶Rest Client
*/
private RestClient restClient = null;
/**
* 这里使用ES单例模式创建RestHighLevelClient
*/
public RestClientUtils() {
if (client == null) {
synchronized (RestHighLevelClient.class) {
if (client == null) {
client = getClient();
}
}
}
}
private RestHighLevelClient getClient() {
RestHighLevelClient client = null;
try {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.187.201", 9300, "http")
)
);
} catch (Exception e) {
e.printStackTrace();
}
return client;
}
private RestClient getRestClient() {
RestClient client = null;
try {
client = RestClient.builder(
new HttpHost("192.168.187.201", 9300, "http")
).build();
} catch (Exception e) {
e.printStackTrace();
}
return client;
}
public void closeClient() {
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* document API 主要是些简单的增删改查操作
*/
public void documentAPI() {
//...
}
/**
* Search API 主要是些复杂查询操作
*/
public void searchAPI() {
//...
}
}
RestHighLevelAPI分为Document APIs、Search APIs、Miscellaneous APIs、Indices APIs、Cluster APIs...等等,这里主要介绍常用的Document APIs和Search APIs,其余的可以参考。
2.1 - Document APIs
Document APIs主要涉及些增删改查等操作,包括Single document APIs单条操作和Multi-document APIs批量操作。Document APIs均可以设置可选参数,实现同步、异步,也均可抛出异常,这里以Index API为例,后面的不再简述。

Index API介绍:
/**
* 增,插入记录
* 插入操作有四种方式,分同步异步操作,可选参数设置,结果返回IndexResponse,抛出异常
* @throws Exception
*/
public void index() throws Exception{
//第一种方式: String
IndexRequest request = new IndexRequest("posts", "doc", "1");
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2020-03-27\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
//第二种方式: Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1").source(jsonMap);
//第三种方式: XContentBuilder automatically converted to JSON
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate" , new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest1 = new IndexRequest("posts", "doc", "1")
.source(builder);
//第四种方式: source -> key-pairs
IndexRequest indexRequest2 = new IndexRequest("posts", "doc", "1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch"
);
//可选的参数设置
request.routing("routing");
request.parent("parent");
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
request.version(2);
request.versionType(VersionType.EXTERNAL);
request.opType(DocWriteRequest.OpType.CREATE);
request.opType("create");
request.setPipeline("pipeline");
//同步执行
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//异步执行
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
//Index Response
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
//抛出异常
IndexRequest request1 = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.version(1);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT){
}
}
}
Get API介绍:
/**
* 根据 id 获取数据
* @throws Exception
*/
public void get() throws Exception{
GetRequest request = new GetRequest("posts", "doc", "1");
//可选参数设置
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
//同步执行
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
//异步执行 listener的写法参照Index的异步执行的listener
client.getAsync(request, RequestOptions.DEFAULT, listener);
//Get Response 获取信息
//抛出异常
}
Exists API介绍:
/**
* 是否存在
* @throws Exception
*/
public void exists() throws Exception{
GetRequest getRequest = new GetRequest("posts", "doc", "1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
//同步执行
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
//异步执行 listener的写法参照Index的
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
if (exists){
System.out.println("存在");
}else {
System.out.println("不存在");
}
}
Delete API介绍:
/**
* 根据id删除
* @throws Exception
*/
public void delete() throws Exception{
DeleteRequest request = new DeleteRequest("posts", "doc", "1");
//同步执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
//异步执行 listener参照index的
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
//Delete Response
String index = deleteResponse.getIndex();
// document was not found
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
}
//抛出异常
}
Update API介绍:
/**
* 根据id更新
* @throws Exception
*/
public void update() throws Exception{
UpdateRequest request = new UpdateRequest("posts", "doc", "1");
Map<String, Object> parameters = Collections.singletonMap("count", 4);
//第一种方式:inline script
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);
request.script(inline);
//第二种方式:stored script
Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters);
request.script(stored);
//第三种方式:partial document String
String jsonString = "{" +
"\"updated\":\"2020-03-27\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
//第四种方式:partial document Map
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
request.doc(jsonMap);
//第五种方式:partial document XContentBuilder
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
request.doc(builder);
//第六种方式:partial document Object key-pairs
request.doc("updated", new Date(),
"reason", "daily update");
//upserts
String jsonString1 = "{\"created\":\"2020-03-27\"}";
request.upsert(jsonString1, XContentType.JSON);
//同步执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//异步执行, listener创建参考index的
client.updateAsync(request, RequestOptions.DEFAULT, listener);
//update Response
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
String sourceAsString = result.sourceAsString();
Map<String, Object> sourceAsMap = result.sourceAsMap();
byte[] sourceAsBytes = result.source();
} else {
}
//抛出异常
}
Bulk API介绍
/**
* 批量处理
* @throws Exception
*/
public void bulk() throws Exception{
BulkRequest request = new BulkRequest();
//Other
request.add(new DeleteRequest("posts", "doc", "3"));
request.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON, "other", "test"));
request.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON, "field", "baz"));
//同步执行
BulkResponse bulkResponses = client.bulk(request, RequestOptions.DEFAULT);
//异步执行
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
//Bulk Response 批处理结果
for (BulkItemResponse bulkItemResponse: bulkResponses){
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
for (BulkItemResponse bulkItemResponse : bulkResponses) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}
//Bulk Processor 自定义批处理器
BulkProcessor.Listener listener1 = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request1, bulkListener) -> client.bulkAsync(request1, RequestOptions.DEFAULT, bulkListener);
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener1).build();
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer1 =
(request2, bulkListener) -> client.bulkAsync(request2, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer1, listener1);
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
//Once the BulkProcessor is created requests can be added to it:
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
}
Multi-Get API介绍
/**
* 根据id批量获取数据
* @throws Exception
*/
public void multiGet() throws Exception{
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index","type","example_id"));
request.add(new MultiGetRequest.Item("index", "type", "another_id"));
//optional arguments
request.add(new MultiGetRequest.Item("index", "type", "example_id") .fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
//同步执行
MultiGetResponse responses = client.mget(request, RequestOptions.DEFAULT);
//异步执行 listener参考Index的
client.mgetAsync(request, RequestOptions.DEFAULT, listener);
//Multi Get Response
MultiGetItemResponse firstItem = response.getResponses()[0];
GetResponse firstGet = firstItem.getResponse();
if (firstGet.isExists()) {
}
}
2.2 - Search APIs
public void search() throws Exception{
//match all query 查询所有数据
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
//使用SearchSourceBuilder查询指定字段
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
sourceBuilder.from(0);
sourceBuilder.size(5);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
SearchRequest searchRequest2 = new SearchRequest();
//index 数据库
searchRequest2.indices("posts");
searchRequest2.source(sourceBuilder);
//Building queries
//One way, QueryBuilder can be created using its constructor 使用QueryBuilder的构造函数
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
matchQueryBuilder.prefixLength(3);
matchQueryBuilder.maxExpansions(10);
//Two way, QueryBuilder objects can also be created using the QueryBuilders utility class. 直接使用matchQuery
QueryBuilder matchQueryBuilder1 = matchQuery("user", "kimchy")
.fuzziness(Fuzziness.AUTO)
.prefixLength(3)
.maxExpansions(10);
searchSourceBuilder.query(matchQueryBuilder1);
//Specifying Sorting 指定排序
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));
//Source filtering, turn off _source retrieval completely
sourceBuilder.fetchSource(false);
//an array of one or more wildcard patterns to control which fields get included or excluded in a more fine grained way
String[] includeFields = new String[] {"title", "user", "innerObject.*"};
String[] excludeFields = new String[] {"_type"};
sourceBuilder.fetchSource(includeFields, excludeFields);
//Requesting Aggregations
SearchSourceBuilder searchSourceBuilder2 = new SearchSourceBuilder();
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
.field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age")
.field("age"));
searchSourceBuilder2.aggregation(aggregation);
//Requesting Suggestions
SearchSourceBuilder searchSourceBuilder3 = new SearchSourceBuilder();
SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
searchSourceBuilder3.suggest(suggestBuilder);
//同步执行
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
//异步执行 listener参考index的
client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
//SearchResponse
RestStatus status = searchResponse.status();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
// failures should be handled here
}
//Retrieving SearchHits 获取结果数据
SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
// do something with the SearchHit
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();
String sourceAsString = hit.getSourceAsString();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject =
(Map<String, Object>) sourceAsMap.get("innerObject");
}
//Retrieving Aggregations
Aggregations aggregations = searchResponse.getAggregations();
Terms byCompanyAggregation = aggregations.get("by_company");
Terms.Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
Avg averageAge = elasticBucket.getAggregations().get("average_age");
double avg = averageAge.getValue();
Range range = aggregations.get("by_company");
Map<String, Aggregation> aggregationMap = aggregations.getAsMap();
Terms companyAggregation = (Terms) aggregationMap.get("by_company");
List<Aggregation> aggregationList = aggregations.asList();
for (Aggregation agg : aggregations) {
String type = agg.getType();
if (type.equals(TermsAggregationBuilder.NAME)) {
Terms.Bucket elasticBucket1 = ((Terms) agg).getBucketByKey("Elastic");
long numberOfDocs = elasticBucket1.getDocCount();
}
}
//Retrieving Suggestions
Suggest suggest = searchResponse.getSuggest();
TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
for (TermSuggestion.Entry.Option option : entry) {
String suggestText = option.getText().string();
}
}
}
Multi-search APIs介绍:
public void multiSearch() throws Exception{
MultiSearchRequest request = new MultiSearchRequest();
SearchRequest firstSearchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("user", "kimchy"));
firstSearchRequest.source(searchSourceBuilder);
request.add(firstSearchRequest);
SearchRequest secondSearchRequest = new SearchRequest();
searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("user", "luca"));
secondSearchRequest.source(searchSourceBuilder);
request.add(secondSearchRequest);
//同步执行
MultiSearchResponse response = client.msearch(request, RequestOptions.DEFAULT);
//同步执行 listener参考index的
client.msearchAsync(request, RequestOptions.DEFAULT, listener);
//MultiSearchResponse
MultiSearchResponse.Item firstResponse = response.getResponses()[0];
SearchResponse searchResponse = firstResponse.getResponse();
MultiSearchResponse.Item secondResponse = response.getResponses()[1];
searchResponse = secondResponse.getResponse();
}
SearchTemplate APIs介绍:
/**
* 查询模板
* @throws Exception
*/
public void searchTemplate() throws Exception{
SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest("posts"));
//Inline Templates
request.setScriptType(ScriptType.INLINE);
//instead of providing an inline script
request.setScriptType(ScriptType.STORED);
request.setScript(
"{" +
" \"query\": { \"match\": { \"{{ field }}\": \"{{ value }}\" } }," +
" \"size\": \"{{ size }}\"" +
"}");
Map<String, Object> scriptParams = new HashMap<>();
scriptParams.put("field", "title");
scriptParams.put("value", "elasticsearch");
scriptParams.put("size", 5);
request.setScriptParams(scriptParams);
//同步执行
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
//异步执行 listener参考Index的
client.searchTemplateAsync(request, RequestOptions.DEFAULT, listener);
//SearchTemplate Response
SearchResponse searchResponse = response.getResponse();
BytesReference source = response.getSource();
}
Multi-SearchTemplate APIs介绍:
/**
* 多个查询模板执行
* @throws Exception
*/
public void MultiSearchTemplate() throws Exception{
String[] searchTerms = {"elasticsearch", "logstash", "kibana"};
MultiSearchTemplateRequest multiRequest = new MultiSearchTemplateRequest();
for (String searchTerm: searchTerms) {
SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest("posts"));
request.setScriptType(ScriptType.INLINE);
request.setScript(
"{" +
" \"query\": { \"match\": { \"{{field}}\": \"{{value}}\" }}," +
" \"size\": \"{{size}}\"" +
"}"
);
Map<String, Object> scriptParams = new HashMap<>();
scriptParams.put("field", "title");
scriptParams.put("value", searchTerm);
scriptParams.put("size", 5);
request.setScriptParams(scriptParams);
multiRequest.add(request);
}
//同步执行
MultiSearchTemplateResponse multiResponse = client.msearchTemplate(multiRequest, RequestOptions.DEFAULT);
//异步执行
ActionListener<MultiSearchTemplateResponse> listener = new ActionListener<MultiSearchTemplateResponse>() {
@Override
public void onResponse(MultiSearchTemplateResponse response) {
}
@Override
public void onFailure(Exception e) {
}
};
client.msearchTemplateAsync(multiRequest, RequestOptions.DEFAULT, listener);
//MultiSearchTemplateResponse
for (MultiSearchTemplateResponse.Item item : multiResponse.getResponses()) {
if (item.isFailure()) {
String error = item.getFailureMessage();
} else {
SearchTemplateResponse searchTemplateResponse = item.getResponse();
SearchResponse searchResponse = searchTemplateResponse.getResponse();
searchResponse.getHits();
}
}
}
其余APIs可以参考。
3 - Jest
Jest是第三方工具,是ElasticSearch的Java HTTP Rest客户端。Jest填补了ElasticSearch缺少Http Rest接口客户端的空白。
Jest的API为:

Jest的具体用法,在这里不再做详细介绍。
4 - Spring Data Elasticsearch
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/ma ... gt%3B
<modelVersion>4.0.0</modelVersion>
<groupId>springboot</groupId>
<artifactId>spring-data-elasticsearch-crud</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-data-elasticsearch-crud :: spring-data-elasticsearch - 基本案例 </name>
<!-- Spring Boot 启动父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
</parent>
<dependencies>
<!-- Spring Boot Elasticsearch 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- Spring Boot Web 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
这里依赖的 spring-boot-starter-data-elasticsearch 版本是 2.2.0.RELEASE,对应的 spring-data-elasticsearch 版本是 3.2.6.RELEASE。后面数据操作层都是通过该 spring-data-elasticsearch 提供的接口实现。
application.properties 配置 ES 地址
spring.data.elasticsearch.repositories.enabled = true
spring.data.elasticsearch.cluster-nodes = 127.0.0.1:9300
默认 9300 是 Java 客户端的端口。9200 是支持 Restful HTTP 的接口。
ES 数据操作层
/**
* ES 操作类
*/
public interface CityRepository extends ElasticsearchRepository<City, Long> {
}
接口只要继承 ElasticsearchRepository 接口类即可,具体使用的是该接口的方法:
Iterable<T> search(QueryBuilder query);
Page<T> search(QueryBuilder query, Pageable pageable);
Page<T> search(SearchQuery searchQuery);
Page<T> searchSimilar(T entity, String[] fields, Pageable pageable);
实体类
/**
* 城市实体类
*/
@Document(indexName = "province", type = "city")
public class City implements Serializable {
private static final long serialVersionUID = -1L;
/**
* 城市编号
*/
private Long id;
/**
* 城市名称
*/
private String name;
/**
* 描述
*/
private String description;
/**
* 城市评分
*/
private Integer score;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public Integer getScore() {
return score;
}
public void setScore(Integer score) {
this.score = score;
}
}
注意:
- City 属性名不支持驼峰式。
- indexName 配置必须是全部小写,不然会出异常。
城市 ES 业务逻辑实现类
/**
* 城市 ES 业务逻辑实现类
*/
@Service
public class CityESServiceImpl implements CityService {
private static final Logger LOGGER = LoggerFactory.getLogger(CityESServiceImpl.class);
/* 分页参数 */
Integer PAGE_SIZE = 12; // 每页数量
Integer DEFAULT_PAGE_NUMBER = 0; // 默认当前页码
/* 搜索模式 */
String SCORE_MODE_SUM = "sum"; // 权重分求和模式
Float MIN_SCORE = 10.0F; // 由于无相关性的分值默认为 1 ,设置权重分最小值为 10
@Autowired
CityRepository cityRepository; // ES 操作类
public Long saveCity(City city) {
City cityResult = cityRepository.save(city);
return cityResult.getId();
}
@Override
public List<City> searchCity(Integer pageNumber, Integer pageSize, String searchContent) {
// 校验分页参数
if (pageSize == null || pageSize <= 0) {
pageSize = PAGE_SIZE;
}
if (pageNumber == null || pageNumber < DEFAULT_PAGE_NUMBER) {
pageNumber = DEFAULT_PAGE_NUMBER;
}
LOGGER.info("\n searchCity: searchContent [" + searchContent + "] \n ");
// 构建搜索查询
SearchQuery searchQuery = getCitySearchQuery(pageNumber,pageSize,searchContent);
LOGGER.info("\n searchCity: searchContent [" + searchContent + "] \n DSL = \n " + searchQuery.getQuery().toString());
Page<City> cityPage = cityRepository.search(searchQuery);
return cityPage.getContent();
}
/**
* 根据搜索词构造搜索查询语句
*
* 代码流程:
* - 权重分查询
* - 短语匹配
* - 设置权重分最小值
* - 设置分页参数
*
* @param pageNumber 当前页码
* @param pageSize 每页大小
* @param searchContent 搜索内容
* @return
*/
private SearchQuery getCitySearchQuery(Integer pageNumber, Integer pageSize,String searchContent) {
// 短语匹配到的搜索词,求和模式累加权重分
// 权重分查询 https://www.elastic.co/guide/c ... .html
// - 短语匹配 https://www.elastic.co/guide/c ... .html
// - 字段对应权重分设置,可以优化成 enum
// - 由于无相关性的分值默认为 1 ,设置权重分最小值为 10
FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders.functionScoreQuery()
.add(QueryBuilders.matchPhraseQuery("name", searchContent),
ScoreFunctionBuilders.weightFactorFunction(1000))
.add(QueryBuilders.matchPhraseQuery("description", searchContent),
ScoreFunctionBuilders.weightFactorFunction(500))
.scoreMode(SCORE_MODE_SUM).setMinScore(MIN_SCORE);
// 分页参数
Pageable pageable = new PageRequest(pageNumber, pageSize);
return new NativeSearchQueryBuilder()
.withPageable(pageable)
.withQuery(functionScoreQueryBuilder).build();
}
}
可以看到该过程实现了,短语精准匹配以及匹配到根据字段权重分求和,从而实现按权重搜索查询。代码流程如下:
- 权重分查询
- 短语匹配
- 设置权重分最小值
- 设置分页参数
注意:
- 字段对应权重分设置,可以优化成 enum
- 由于无相关性的分值默认为 1 ,设置权重分最小值为 10
网友评论