前言
之前研究很久elk的原理和Kibana的dsl语句,但是实际运用时候还是需要使用java的api,所以开始学习java API
官方API文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-api.html
5.4中文官方文档:http://cwiki.apachecn.org/pages/viewpage.action?pageId=4260364
文章项目源码地址:https://gitee.com/lwydyby/java-elk
一.准备工作:
1.安装elk三件套(这个自行百度,不是重点)
2.搭建maven项目(这里使用spring boot2.0搭建项目):
pom文件:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
注:这里使用elasticsearch-rest-high-level-client,因为官方文档中说之前的版本会在7.0放 弃使用,所以直接选择这个版本学习。虽然spring boot自身存在version,但是并不是最新的 6.3.0故需要单独配置
3.配置RestHighLevelClient:
注:这里只配置最基础的,如果配置集群请自己查看官方文档
@Bean
public RestHighLevelClient restHighLevelClient(){
return new RestHighLevelClient(
RestClient.builder(
new HttpHost(elkUrl, elkPort, "http")));
}
二.开始编写调用方法
1.Index API
1).初始化IndexRequest(id不填写则自动生成),拼装要保存文档的json串,官方给了4种方式,我这里使用第二种:
方法1.png
方法23.png
方法4.png
2).进行其他额外配置,如路由,父子关系,超时时间等,具体使用见文档
3).发送请求,这里提供了同步和异步两种方式,我这里使用的同步请求
具体方法代码:
/**
* 存入Object至elasticsearch
* @param o 存入对象
* @param index 索引名称
* @param type 类型名称
* @param id 对象id,null则自动生成
* @return
*/
public IndexResponse insertObject(Object o, String index, String type, String id) {
IndexRequest request = null;
/* id为空则自动生成*/
if (id == null) {
request = new IndexRequest(index, type);
} else {
request = new IndexRequest(index, type, id);
}
Field[] f = o.getClass().getDeclaredFields();
Map<String, Object> jsonMap = new HashMap<>();
try {
Field.setAccessible(f,true);
for (Field field : f) {
String name = field.getName();
if (name.equals("id")) {
if (field.get(o) != null) {
String ids= (String) field.get(o);
request = new IndexRequest(index, type, ids);
}
}else {
jsonMap.put(name,field.get(o));
}
}
} catch (IllegalAccessException e) {
logger.error("存入对象解析失败:", e);
}
request.timeout(TimeValue.timeValueSeconds(5));
request.timeout("5s");
request.source(jsonMap);
try {
return client.index(request);
} catch (IOException e) {
logger.error("存入elk失败:", e);
}
return null;
}
2.Get API
1)初始化GetRequest ,这里只能通过id进行具体查询,搜索功能在后边的search API中
2).配置其他额外要求,如配置需要的字段:
字段筛选.png
3).同样两种请求方式
方法:
/**
* 获取index值 必须已知id
* @param index 索引值
* @param type 类型
* @param id id
* @param c 封装的对象class
*/
public Object getObject(String index, String type, String id, Class c) {
GetRequest request = null;
if (type == null || id == null) {
request = new GetRequest(index);
} else {
request = new GetRequest(index, type, id);
}
String[] includes = new String[c.getDeclaredFields().length];
String[] excludes = Strings.EMPTY_ARRAY;
int i=0;
Field [] f=c.getDeclaredFields();
Field.setAccessible(f,false);
for(Field field:f){
String name = field.getName();
if (!name.equals("id")) {
includes[i++]=name;
}
}
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
try {
GetResponse getResponse = client.get(request);
String ids=getResponse.getId();
String sourceAsString = getResponse.getSourceAsString();
logger.info("获取到的信息:"+sourceAsString);
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
Object obj = null;
try {
obj = c.newInstance();
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-mm-dd");
Field idfield=c.getDeclaredField("id");
idfield.setAccessible(true);
idfield.set(obj,ids);
for (String in : sourceAsMap.keySet()) {
Object str = sourceAsMap.get(in);
for(Field field:f){
field.setAccessible(true);
String name = field.getName();
if (name.equals(in)) {
if(name.contains("dt")){
field.set(obj,sdf.parse((String) str));
}else{
field.set(obj,str);
}
}
}
}
} catch (InstantiationException | IllegalAccessException e) {
logger.error("解析失败:",e);
} catch (ParseException e) {
logger.error("日期解析失败",e);
} catch (NoSuchFieldException e) {
logger.error("解析id失败:",e);
}
return obj;
} catch (IOException e) {
logger.error("Get查询失败", e);
}
return null;
}
3.Exists API(太简单了,直接上代码)
/**
* 判断索引下是否存在该id的信息
* @param index 索引名称
* @param type 类型
* @param id id
* @return
*/
public boolean existsObject(String index,String type,String id){
GetRequest getRequest = new GetRequest(index, type, id);
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
try {
return client.exists(getRequest);
} catch (IOException e) {
logger.error("Get查询失败", e);
}
return false;
}
4.Delete Api(这里可以单一id删除,也可以删除所有)
/**
* 删除某索引
* @param index
* @param type
* @param id id为null则为删除整个索引
*/
public boolean deleteObject(String index,String type,String id){
DeleteRequest request =null;
if(id==null){
request=new DeleteRequest(index);
}else {
request = new DeleteRequest(index, type, id);
}
try {
DeleteResponse deleteResponse = client.delete(request);
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
logger.info("存在部分未删除成功");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
logger.info("删除失败的原因:"+reason);
return false;
}
}
} catch (IOException e) {
logger.error("删除失败:", e);
return false;
}
return true;
}
5.update Api(其实和insert差不多)
其余操作和insert差不多,但是update支持存在则更新不存在则插入的功能
1.png
具体代码:
/**
* 更新对象
* @param o
* @param index
* @param type
*/
public void updateObject(Object o, String index, String type) {
UpdateRequest request = null;
Field[] f = o.getClass().getDeclaredFields();
Map<String, Object> jsonMap = new HashMap<>();
try {
Field.setAccessible(f,true);
for (Field field : f) {
String name = field.getName();
if (name.equals("id")) {
String ids= (String) field.get(o);
request = new UpdateRequest(index, type, ids);
}else {
jsonMap.put(name,field.get(o));
}
}
jsonMap.put("updated",new Date());
} catch (IllegalAccessException e) {
logger.error("存入对象解析失败:", e);
}
if(request==null){
throw new RuntimeException("object中未找到id");
}
request.docAsUpsert(true);
request.doc(jsonMap);
try {
client.update(request);
} catch (IOException e) {
logger.error("更新失败:", e);
}
}
- bulk Api-批处理
没什么特殊的就是把之前的单一操作改为统一提交
/**
* 批处理,这里只做删除批处理
* @param index
* @param type
* @param list
*/
public void bulkDelete(String index, String type, List<String>list){
BulkRequest request = new BulkRequest();
for(String id:list){
request.add(new DeleteRequest(index,type,id));
}
try {
client.bulk(request);
} catch (IOException e) {
logger.error("批量删除失败:", e);
}
}
7.查询索引下所有文档(不属于Documents-Api,但为了包含增删改查所有功能写在这)
/**
* 获取Index下所有的内容(为search api方法,但为了提供全额查询功能写在这里)
* @param index
* @param type
* @param c
* @throws Exception
*/
public List<Object> getAll(String index,String type,Class c) throws Exception {
try {
List<Object> objects=new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(type);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest);
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
Object obj= MaptoObject.map2Object(sourceAsMap,Book.class,hit.getId());
objects.add(obj);
}
return objects;
} catch (IOException e) {
logger.error("查询所有失败",e);
}
return null;
}
今天就看到这,有时间继续更新....................
网友评论