版本:Elasticsearch 7.2.0
当你把Elasticsearch升级到7后会发现TransportClient这个客户端已过时,而且PreBuiltTransportClient这个已被删除,那么现在有如何使用java 客户端?Elasticsearch官方建议使用REST高级客户端RestHighLevelClient。话不多说,直接看例子:
- 创建客户端、及关闭
- 插入一条数据(如果存在则更新) index()
- 获取一个文档 get()
- 文档是否存在 exists()
- 删除文档 delete()
- 更新 update()
- 批量更新 bulk()
- 批量查询 multiGet()
- 异步操作
- maven配置
1.创建客户端、及关闭
/**
* 获取客户端
*
* @return
*/
private static RestHighLevelClient getClient() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("node01", 9200, "http"),
new HttpHost("node02", 9200, "http"),
new HttpHost("node03", 9200, "http")));
return client;
}
/**
* 关闭客户端
*
* @param client
*/
private static void close(RestHighLevelClient client) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
2.插入一条数据(如果存在则更新) index()
/**
* 插入一条数据(如果存在则更新)
*/
public static void index() {
RestHighLevelClient client = getClient();
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest request = new IndexRequest("user").id("1").source(jsonMap);
IndexResponse response = null;
try {
response = client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
String index = response.getIndex();
String id = response.getId();
System.out.println(index + "---" + id);
close(client);
}
3.获取一个文档 get()
/**
* 获取一个文档
*/
public static void get() {
RestHighLevelClient client = getClient();
GetRequest getRequest = new GetRequest("user", "1");
try {
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
String index = getResponse.getIndex();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
System.out.println("索引:" + index + ",ID:" + id + "版本:" + version);
System.out.println(sourceAsString);
System.out.println(sourceAsMap);
System.out.println(sourceAsBytes.toString());
} else {
System.out.println("未查到结果");
}
} catch (IOException e) {
e.printStackTrace();
}
close(client);
}
4.文档是否存在 exists()
/**
* 文档是否存在
*/
public static void exists() {
RestHighLevelClient client = getClient();
GetRequest getRequest = new GetRequest("user", "1");
// 禁止获取_source
getRequest.fetchSourceContext(new FetchSourceContext(false));
// 禁止获取存储字段
getRequest.storedFields("_none_");
try {
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
System.out.println(getRequest.index() + "---" + getRequest.id() + "---文档是否存在:" + exists);
} catch (IOException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
close(client);
}
5.删除文档 delete()
/**
* 删除文档
*/
public static void delete() {
RestHighLevelClient client = getClient();
DeleteRequest deleteRequest = new DeleteRequest("user", "1");
try {
client.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(deleteRequest.index() + "---" + deleteRequest.id() + ": 文档已删除");
exists();
} catch (IOException e) {
e.printStackTrace();
}
close(client);
}
6.更新 update()
public static void update() {
RestHighLevelClient client = getClient();
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("user", "2").doc(jsonMap);
try {
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println(response.getIndex() + "---" + response.getId() + "完成更新");
} catch (IOException e) {
e.printStackTrace();
}
close(client);
}
7.批量更新 bulk()
/**
* 批量更新
*/
public static void bulk() {
RestHighLevelClient client = getClient();
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("user").id("3").source(XContentType.JSON, "field", "foo", "user", "lucky"));
request.add(new IndexRequest("user").id("4").source(XContentType.JSON, "field", "bar", "user", "Jon"));
request.add(new IndexRequest("user").id("5").source(XContentType.JSON, "field", "baz", "user", "Lucy"));
// id为10的不存在
request.add(new DeleteRequest("user", "3"));
request.add(new UpdateRequest("user", "2").doc(XContentType.JSON, "other", "test"));
BulkResponse bulkResponse = null;
try {
bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
// 获取执行状态
System.out.println("批量更新结果状态:" + bulkResponse.status());
close(client);
}
8.批量查询 multiGet()
// 批量查询
public static void multiGet() {
RestHighLevelClient client = getClient();
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("user", "2"));
request.add(new MultiGetRequest.Item("user", "4"));
request.add(new MultiGetRequest.Item("user", "5"));
MultiGetResponse mget = null;
try {
mget = client.mget(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
// 打印查詢結果
System.out.println("mget:");
mget.forEach(item -> System.out.println(item.getResponse().getSourceAsString()));
close(client);
}
9.异步操作
以上做法均为同步性操作,非异步操作,所以这些操作都是阻塞式操作,知道查到结果为止。对异步操作只列出代码,不做举例
ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
// 查询成功
}
@Override
public void onFailure(Exception e) {
// 查询失败
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
10.maven
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>7.2.0</version>
<!-- <version>6.7.1</version> -->
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.2.0</version>
<!-- <version>6.7.1</version> -->
</dependency>
网友评论