最近,在尝试搭建ELK
日志管理中心,因为以前Elasticsearch
用的比较少,现在把使用流程大致总结一下。
我本地测试环境如下:
-
JDK
:1.8 -
SpringBoot
:2.3.11 -
Elasticsearch
:7.10.2
1. SpringBoot
整合Elasticsearch
方式
Spring
官网中,Spring Data Elasticsearch - Reference Documentation已经给出了几种方式:

1. 使用
Spring Data Elasticsearch
这是
Spring
官方最推荐的,自家的东西,就像JPA
一样,在DAO层继承ElasticsearchRepository
,就可以使用封装好的一些常见的操作了,用起来简单方便,可以使用。虽然
Spring
很认真的在推荐你用这个,但是貌似Spring-data-elasticsearch
的更新速度跟不上Elasticsearch
官方的版本,最新的SpringBoot
和Elasticsearch
的版本对应关系如下:
按理来说,我用的2.3.XX的
SpringBoot
,应该用7.6.2的Elasticsearch
,但是我都装了7.10.2了,先凑合吧,大家记得按版本来,不然可能会有掉坑里。2. 使用
Transport Client
这是专门的Java Api,基于基于TCP协议和ES通信。但是ES 7.0版本中将弃用TransportClient客户端,且在8.0版本中完全移除它,虽然
Spring-data-elasticsearch
会一直支持低版本使用这种方式,但是不提倡。3. 使用
High Level REST Client
这是基于HTTP协议的客户端,是ES官方推荐使用的,也是可以使用的,但是要求对ES的DSL语句熟悉,方便自己做增删改查
2. 基于Spring Data Elasticsearch
操作ES数据
2.1 导包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.5</version>
</dependency>
2.2 yml文件
spring:
elasticsearch:
rest:
uris:
- http://192.168.0.1:9200
# 用户名
username:
# 密码
password:
# 连接超时时间
connection-timeout: 1000
# 读取超时时间
read-timeout: 1000
2.3 ES中使用的实体类
关系型数据库与 ES中的概念对比参照:
RDBMS | ES |
---|---|
Table | Index(Type) |
Row | Document |
Column | Field |
Schema | Mapping |
SQL | DSL |
Spring Data通过注解来声明字段的映射属性,有下面的三个注解:
-
@Document
:作用在类,标记实体类为文档对象,一般有四个属性-
indexName
:对应索引库名称 -
shards
:分片数量,默认5 -
replicas
:副本数量,默认1
-
-
@Id
:作用在成员变量,标记一个字段作为id主键 -
@Field
:作用在成员变量,标记为文档的字段,并指定字段映射属性:-
type
:字段类型,取值是枚举:FieldType -
index
:是否索引,布尔类型,默认是true -
store
:是否存储,布尔类型,默认是false -
analyzer
:分词器名称:ik_max_word
-
@Data
@Document(indexName = "eslog", shards = 5)
public class ESLog {
@Id
private Integer pkId;
@ApiModelProperty(value = "访问的用户编号")
private String userid;
@ApiModelProperty(value = "访问时间")
@Field(type = FieldType.Date, format = DateFormat.custom, pattern ="yyyy-MM-dd HH:mm:ss")
@JsonFormat(shape =JsonFormat.Shape.STRING,pattern ="yyyy-MM-dd HH:mm:ss",timezone ="GMT+8")
private Date accessTime;
@ApiModelProperty(value = "访问时的请求参数")
private String requestParams;
@ApiModelProperty(value = "请求完成后返回的结果数据")
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String responseData;
@ApiModelProperty(value = "通过哪个IP来访问的")
private String remoteIpAddress;
@ApiModelProperty(value = "访问的哪个模块")
private String accessModule;
@ApiModelProperty(value = "访问的哪个功能")
private String accessFunction;
@ApiModelProperty(value = "访问的哪个方法")
@Field(type = FieldType.Keyword)
private String accessMethod;
@ApiModelProperty(value = "访问的功能路径")
private String requestUri;
@ApiModelProperty(value = "访问功能时所消耗的时间")
private int timeConsuming;
@ApiModelProperty(value = "日志类型 1-登录 2-业务 ")
private String logType;
}
2.4 DAO层接口
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import com.test.elasticsearch.service.entity.ESLog;
public interface ESLogDao extends ElasticsearchRepository<ESLog, String> {
}
2.5 Service层
在服务层,想查询数据有2种情况:
- 简单查询,用
ElasticsearchRepository
封装的方法就行(就是save和find,search已经作废了) - 复杂的查询,引入
ElasticsearchTemplate
,用其进行索引操作、高级查询、聚合查询等。其实ElasticsearchTemplate
底层还是用的Elasticsearch
官方的High Level REST Client
@Service
public class ESService {
@Resource
private CarRecordDao carRecordDao;
@Resource
private ESRecordDao esRecordDao;
@Resource
private ElasticsearchRestTemplate esTemplate;
/**
* 添加数据
*/
public String esAddRecord() {
List<CarRecord> list = carRecordDao.selectList(new QueryWrapper<CarRecord>().orderByDesc("pk_id").last("limit 10000"));
List<ESRecord> list1 = new ArrayList<ESRecord>();
for (CarRecord cr : list) {
ESRecord esr = new ESRecord();
BeanUtils.copyProperties(cr, esr);
list1.add(esr);
}
esRecordDao.saveAll(list1);
return "OK";
}
/**
* 查询所有
*/
public Result<String> queryListData() {
Iterable<ESRecord> ite = esRecordDao.findAll();
Iterator<ESRecord> it = ite.iterator();
return new Result<String>().success(it);
}
/**
* 分页查询
*/
public Result<String> queryPageData() {
PageRequest p = PageRequest.of(1, 5, Sort.by("pkId").ascending());
Page<ESRecord> res = esRecordDao.findAll(p);
return new Result<String>().success(res);
}
/**
* 创建索引
*/
public Boolean creatIndex(String index) {
return esTemplate.indexOps(IndexCoordinates.of(index)).create();
}
/**
* 判断索引是否存在
*/
public Boolean creatIndex(String index) {
return esTemplate.indexOps(ESLog.class).exists();
}
/**
* 条件查询
*/
public Result<String> queryEsRecordData() {
return null;
}
}
注意:使用ElasticsearchTemplate进行复杂的聚合查询还要继续研究,待后续完善!!!
3. 基于High Level REST Client
操作ES数据
3.1 导包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.5</version>
</dependency>
3.2 yml文件
spring:
elasticsearch:
rest:
uris:
- http://192.168.0.1:9200
# 用户名
username:
# 密码
password:
# 连接超时时间
connection-timeout: 1000
# 读取超时时间
read-timeout: 1000
3.3 Service层
在这需要注入RestHighLevelClient
@Service
public class ESService {
@Resource
private RestHighLevelClient restClient;
/*
* 测试索引是否存在
*/
@Test
public void testExistIndex() throws IOException {
GetIndexRequest request1 = new GetIndexRequest("test1");
System.out.println(restClient.indices().exists(request1, RequestOptions.DEFAULT));
GetIndexRequest request2 = new GetIndexRequest("test2");
System.out.println(restClient.indices().exists(request2, RequestOptions.DEFAULT));
}
// 创建索引
@Test
public void testCreateIndex() throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest("test2");
CreateIndexResponse response = restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
System.out.println(response);
}
/*
* 添加document
*/
@Test
public void testAddDocument() throws IOException {
User u = new User();
u.setId(1002);
u.setName("测试222");
u.setPhone("15031525876");
u.setSex(1);
IndexRequest request = new IndexRequest("user");
request.id(u.getId().toString());
//将我们的数据放入请求,json
request.source(JSONUtil.toJsonStr(u), XContentType.JSON);
//客服端发送请求
IndexResponse res = restClient.index(request, RequestOptions.DEFAULT);
System.out.println(res.getId());
//对应我们的命令返回状态
System.out.println(res.status());
}
/*
* 修改document
*/
@Test
public void testUpdateDocument() throws IOException {
UpdateRequest request = new UpdateRequest("user", "1002");
request.timeout("1s");
User u = new User();
u.setId(1002);
u.setPhone("12345678911");
u.setSex(2);
request.doc(JSONUtil.toJsonStr(u), XContentType.JSON);
UpdateResponse update = restClient.update(request, RequestOptions.DEFAULT);
System.out.println(update);
System.out.println(update.status());
}
/*
* 判断document是否存在
*/
@Test
public void testIsExist() throws IOException {
GetRequest getRequest = new GetRequest("user", "1001");
// 不获取返回的source的上下文
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
boolean exists = restClient.exists(getRequest, RequestOptions.DEFAULT);
System.out.println(exists);
}
/*
* 查询document
*/
@Test
public void testGetDocument() throws IOException {
GetRequest getRequest = new GetRequest("user", "1001");
GetResponse response = restClient.get(getRequest, RequestOptions.DEFAULT);
//打印文档信息
System.out.println(response.getSourceAsString());
System.out.println(response);
}
/*
* 删除document
*/
@Test
public void testDeleteDocument() throws IOException {
DeleteRequest request = new DeleteRequest("user", "1003");
DeleteResponse update = restClient.delete(request, RequestOptions.DEFAULT);
System.out.println(update.status());
}
/*
* 批量添加document
*/
@Test
public void testBulkRequest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
List<User> userList = new ArrayList<>();
userList.add(new User());
userList.add(new User());
userList.add(new User());
for (int i = 0; i < userList.size(); i++) {
bulkRequest.add(
new IndexRequest("user")
.id("" + i + 1)
.source(JSONUtil.toJsonStr(userList.get(i)), XContentType.JSON)
);
}
BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulk);
}
/*
* 批量修改删除document
* 批量修改和删除和新增一样,也是用的BulkRequest,只不过add时用UpdateRequest或DeleteRequest
*/
@Test
public void testBulkRequestUpdate() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
List<User> userList = new ArrayList<>();
User u1 = new User();
u1.setId(1010);
u1.setName("qqqqq");
u1.setPhone(" ");
userList.add(u1);
User u2 = new User();
u2.setId(1011);
u2.setName("wwww");
u2.setPhone("999999");
u2.setSex(2);
userList.add(u2);
User u3 = new User();
u3.setId(1012);
u3.setName("eeee");
u3.setPhone("8888888");
u3.setSex(1);;
userList.add(u3);
for (int i = 0; i < userList.size(); i++) {
bulkRequest.add(
new UpdateRequest("user", userList.get(i).getId() + "")
.doc(JSONUtil.toJsonStr(userList.get(i)), XContentType.JSON)
);
}
bulkRequest.add(new DeleteRequest("user", "1010"));
BulkResponse res1 = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(res1.status());
}
}
注意:以上操作只是简单的根据ID的操作,复杂的聚合查询需要使用QueryBuilder,待后续完善!!!
网友评论