Document:文档
- Elasticsearch是面向文档的,文档是所有可搜索数据的最小基础信息单元。
- 一个Document就像数据库中的一行记录,文档会被序列化成JSON格式,保持在Elasticsearch中,多个Document存储于一个索引(Index)中。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个到处存在的互联网数据交互格式。
- 每一个文档都有一个UniqueID
文档的元数据
- 元数据:用于标注稳定的相关信息
- _index:文档所属的索引名
- _type:文档所属的类型名
- _id:文档的主键,在写入的时候,可以指定该Doc的ID值,如果不指定,则系统自动生成一个唯一的UUID值。
- _source:文档的原始Json数据
- _version:文档的版本信息,Elasticsearch通过使用version来保证对文档的变更能以正确的顺序执行,避免乱序造成的数据丢失。
- _score:相关性打分。
- _seq_no:严格递增的顺序号,每个文档一个,Shard级别严格递增,保证后写入的Doc的_seq_no大于先写入的Doc的_seq_no。
- primary_term:primary_term也和_seq_no一样是一个整数,每当Primary Shard发生重新分配时,比如重启,Primary选举等,_primary_term会递增1。
- found:查询的ID正确那么ture, 如果 Id 不正确,就查不到数据,found字段就是false。
生成文档id
手动生成
场景:从数据库或其他系统导入时,本身有唯一主键。
用法:PUT /index/_doc/id
PUT /test_index/_doc/1
{
"test_field": "test"
}
自动生成
用法:PUT /index/_doc
PUT /test_index/_doc
{
"test_field": "test"
}
自动id特点:长度为20个字符,URL安全,base64编码,GUID,分布式生成不冲突。
_source字段
含义:插入数据时的所有字段和值,在get获取数据时,在_source字段中原样返回。
定制返回字段:GET /index/_doc/id?_source_includes=field1,field2
文档的替换与删除
全量替换
执行两次,返回结果中版本号(_version)在不断上升,此过程为全量替换。
PUT /test_index/_doc/1
{
"test_field": "test"
}
实质:旧文档的内容不会立即删除,只是标记为delete,适当的时机,集群会将这些文档删除。
强制创建
为防止覆盖原有数据,我们在新增时,设置为强制创建,就不会覆盖原有文档。
语法:PUT /index/_doc/id/_create
PUT /test_index/_doc/1/_create
{
"test_field": "test"
}
删除
语法:DELETE /index/_doc/id
实质:旧文档的内容不会立即删除,只是标记为deleted,适当的时机,集群会将这些文档删除,lazy delete。
局部替换 partial update
使用PUT /index/type/id
为文档全量替换,需要将文档所有数据提交。
partial update局部替换则只修改变动字段。
用法:
PUT /index/type/id/_update
{
"doc": {
"field": "value"
}
}
内部原理
内部与全量替换是一样的,旧文档标记为删除,新建一个文档。
步骤:
- 1、es获取内部旧文档。
- 2、将传来的文档field更新到旧数据(内存中)
- 3、将旧文档标记为delete
- 4、创建新文档。
优点:
- 大大减少网络传输次数和流量,提升性能。
- 减少并发冲突发生的概率。
使用脚本更新
es可以内置脚本执行复杂操作,例如painless脚本。
注意:groovy脚本在es6以后就不支持了,原因是耗内存,不安全远程注入漏洞。
内置脚本
插入数据
PUT /test_index/_doc/6
{
"num": 0
}
执行脚本操作:
PUT /test_index/_doc/6/_update
{
"script": "ctx._source.num+=1"
}
外部脚本
Painless是内置的,脚本内容可以通过多种途径传给es,包括rest接口,或者放到config/scripts目录等,默认开启。
注意:脚本性能底下,容易发生注入。
官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html
elasticsearch的并发问题
如同秒杀,多线程情况下,es同样会出现并发冲突问题。
实现基于_version的版本控制
es对于文档的增删改都是基于_version版本号的。
- 1、多次新增,返回版本号递增
PUT /test_index/_doc/3
{
"test_field": "test"
}
- 2、删除此文档
DELETE /test_index/_doc/3
返回
{
"_index" : "test_index",
"_type" : "_doc",
"_id" : "3",
"_version" : 4,
"result" : "deleted",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 3,
"_primary_term" : 1
}
-
3、删除后新增
可以看到版本号依然递增,验证延迟删除策略。
如果删除一条数据,es立马删除的话,所有分片和副本都要立马删除,对es集群来说压力太大。
es内部主从同步并发控制
es内部主从同步时,是多线程异步,乐观锁机制。
java api实现文档管理
es技术特点
- 1、es技术比较特殊,不像其他分布式,es代码层面很好写,难的是概念的理解。
- 2、es最重要的是它的rest api,跨语言的,在真实生产中,查询数据、分析数据,使用rest api更方便。
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.3/java-rest-overview.html
java客户端简单获取数据
引入maven依赖
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.3.0</version>
</dependency>
</dependencies>
步骤
- 1、获取连接的客户端
- 2、构建请求
- 3、执行
- 4、获取结果
public class TestDemo {
public static void main(String[] args) throws IOException {
//1、获取连接的客户端
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
new HttpHost("localhost",9200,"http")
));
//2、构建请求
GetRequest request = new GetRequest("book","1");
//3、执行
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
//4、获取结果
System.out.println(getResponse.getId());
System.out.println(getResponse.getVersion());
System.out.println(getResponse.getSourceAsString());
}
}
结合spring-boot-starter-test测试文档查询
引入maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.10.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置application.properties
spring.application.name=service-search
#多个节点用逗号隔开
elasticsearch.hostlist=127.0.0.1:9200
创建配置类ElasticsearchConfig
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.hostlist}")
private String hostlist;
@Bean(destroyMethod = "close")
public RestHighLevelClient restHighLevelClient(){
String[] split = hostlist.split(",");
HttpHost[] httpHost = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String[] item = split[i].split(":");
httpHost[i] = new HttpHost(item[0],Integer.parseInt(item[1]),"http");
}
return new RestHighLevelClient(RestClient.builder(httpHost));
}
}
编写测试代码
简单同步查询
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestDocument {
@Autowired
private RestHighLevelClient client;
@Test
public void testGet() throws IOException {
//1、构建请求
GetRequest request = new GetRequest("book","1");
//===========可选参数,可以设置很多================
String[] includes = new String[]{"user","message"};//想要查询的字段
String[] excludes = Strings.EMPTY_ARRAY;//不想要的字段,暂时设置空数组
FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes,excludes);
request.fetchSourceContext(fetchSourceContext);
//2、执行
//同步查询
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
//3、获取结果
if(getResponse.isExists()){
System.out.println(getResponse.getId());
System.out.println(getResponse.getVersion());
System.out.println(getResponse.getSourceAsString());//以String获取数据
System.out.println(getResponse.getSourceAsBytes());//以bytes获取数据
System.out.println(getResponse.getSourceAsMap());//以Map获取数据
}else {
log.info("没有获取到结果");
}
}
}
简单异步查询
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestDocument {
@Autowired
private RestHighLevelClient client;
@Test
public void testGet() throws IOException {
//1、构建请求
GetRequest request = new GetRequest("book","1");
//===========可选参数,可以设置很多================
String[] includes = new String[]{"user","message"};//想要查询的字段
String[] excludes = Strings.EMPTY_ARRAY;//不想要的字段,暂时设置空数组
FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes,excludes);
request.fetchSourceContext(fetchSourceContext);
//2、执行
//异步查询
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
//成功时的操作
@Override
public void onResponse(GetResponse getResponse) {
System.out.println(getResponse.getId());
System.out.println(getResponse.getVersion());
System.out.println(getResponse.getSourceAsString());//以String获取数据
}
//失败时的操作
@Override
public void onFailure(Exception e) {
log.error("error",e);
}
};
client.getAsync(request, RequestOptions.DEFAULT,listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
新增
4种构建文档数据的方法
- 1、json字符串方式
IndexRequest request = new IndexRequest("book");
request.id("2");
String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
request.source(json, XContentType.JSON);
- 2、文档源作为Map提供,可自动转换为JSON格式。
IndexRequest request = new IndexRequest("book");
request.id("2");
Map<String,Object> map = new HashMap<>();
map.put("user","tom");
map.put("postDate","2020-10-25");
map.put("message","trying out es");
request.source(map);
- 3、文档源作为XContentBuilder对象提供,Elasticsearch内置辅助生成JSON内容。
IndexRequest request = new IndexRequest("book");
request.id("2");
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user","tomas");
builder.timeField("postDate","2020-10-25");
builder.field("message","trying out es");
}
builder.endObject();
request.source(builder);
- 4、文档源作为Object键值对提供,转换为JSON格式。
IndexRequest request = new IndexRequest("book");
request.id("2");
request.source("user","tomas",
"postDate","2020-10-25",
"message","trying out es");
同步新增
@Test
public void testAdd() throws IOException {
//1、构建请求
IndexRequest request = new IndexRequest("book");
request.id("3");
//===============构建文档数据4种方法=======================
//方法1
String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
request.source(json, XContentType.JSON);
//方法2
/*Map<String,Object> map = new HashMap<>();
map.put("user","tom");
map.put("postDate","2020-10-25");
map.put("message","trying out es");
request.source(map);
//方法3
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user","tomas");
builder.timeField("postDate","2020-10-25");
builder.field("message","trying out es");
}
builder.endObject();
request.source(builder);
//方法4
request.source("user","tomas",
"postDate","2020-10-25",
"message","trying out es");*/
//=====================可选参数========================
//设置超时时间
request.timeout("1s");
request.timeout(TimeValue.timeValueSeconds(1));
//手动维护版本号
request.version(2);
request.versionType(VersionType.EXTERNAL);
//2、执行
//同步操作
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
//3、获取结果
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
if(response.getResult() == DocWriteResponse.Result.CREATED){
log.info("新增成功,result: {}",response.getResult());
}else if(response.getResult() == DocWriteResponse.Result.UPDATED){
log.info("更新成功,result: {}",response.getResult());
}else {
log.info("操作失败,result: {}",response.getResult());
}
ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
if(shardInfo.getTotal() != shardInfo.getSuccessful()){
log.info("处理成功的分片数少于总分片");
}
if(shardInfo.getFailed() > 0){
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();//每一个错误的原因
log.info(reason);
}
}
}
异步新增
@Test
public void testAdd() throws IOException {
//1、构建请求
IndexRequest request = new IndexRequest("book");
request.id("3");
//===============构建文档数据4种方法=======================
//方法1
String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
request.source(json, XContentType.JSON);
//方法2
/*Map<String,Object> map = new HashMap<>();
map.put("user","tom");
map.put("postDate","2020-10-25");
map.put("message","trying out es");
request.source(map);
//方法3
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user","tomas");
builder.timeField("postDate","2020-10-25");
builder.field("message","trying out es");
}
builder.endObject();
request.source(builder);
//方法4
request.source("user","tomas",
"postDate","2020-10-25",
"message","trying out es");*/
//=====================可选参数========================
//设置超时时间
request.timeout("1s");
request.timeout(TimeValue.timeValueSeconds(1));
//手动维护版本号
request.version(2);
request.versionType(VersionType.EXTERNAL);
//2、执行
//异步操作
ActionListener<IndexResponse> actionListener = new ActionListener<IndexResponse>() {
//成功时进行的操作
@Override
public void onResponse(IndexResponse indexResponse) {
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
if(response.getResult() == DocWriteResponse.Result.CREATED){
log.info("新增成功,result: {}",response.getResult());
}else if(response.getResult() == DocWriteResponse.Result.UPDATED){
log.info("更新成功,result: {}",response.getResult());
}else {
log.info("操作失败,result: {}",response.getResult());
}
}
//失败时进行的操作
@Override
public void onFailure(Exception e) {
log.error("error",e);
}
};
client.indexAsync(request,RequestOptions.DEFAULT,actionListener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
文档修改
全量替换:文档同一个Id多新增几次,就叫全量替换。
同步局部替换
@Test
public void testUpdate() throws IOException {
//1、创建请求
UpdateRequest request = new UpdateRequest("book","3");
Map<String,Object> map = new HashMap<>();
map.put("user","tomas Lee");
request.doc(map);
//=========可选参数==========
request.timeout("1s");
request.retryOnConflict(3);//重试次数
//2、执行
//同步操作
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//3、获取结果
log.info("index: {}",updateResponse.getIndex());
log.info("id: {}",updateResponse.getId());
if(updateResponse.getResult() == DocWriteResponse.Result.CREATED){
log.info("新增成功,result: {}",updateResponse.getResult());
}else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
log.info("更新成功,result: {}",updateResponse.getResult());
}else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
log.info("删除成功,result: {}",updateResponse.getResult());
}else if(updateResponse.getResult() == DocWriteResponse.Result.NOOP){
log.info("NOOP 不进行操作,result: {}",updateResponse.getResult());
}
}
异步局部替换
@Test
public void testUpdate() throws IOException {
//1、创建请求
UpdateRequest request = new UpdateRequest("book","3");
Map<String,Object> map = new HashMap<>();
map.put("user","tomas Lee");
request.doc(map);
//=========可选参数==========
request.timeout("1s");
request.retryOnConflict(3);//重试次数
//2、执行
//异步操作
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
log.info("index: {}",updateResponse.getIndex());
log.info("id: {}",updateResponse.getId());
if(updateResponse.getResult() == DocWriteResponse.Result.CREATED){
log.info("新增成功,result: {}",updateResponse.getResult());
}else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
log.info("更新成功,result: {}",updateResponse.getResult());
}else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
log.info("删除成功,result: {}",updateResponse.getResult());
}else if(updateResponse.getResult() == DocWriteResponse.Result.NOOP){
log.info("NOOP 不进行操作,result: {}",updateResponse.getResult());
}
}
@Override
public void onFailure(Exception e) {
log.info("error",e);
}
};
client.updateAsync(request, RequestOptions.DEFAULT,listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
同步文档删除
@Test
public void testDelete() throws IOException {
//1、创建请求
DeleteRequest request = new DeleteRequest("book","3");
//2、执行
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
//3、获取结果
log.info("index: {}",deleteResponse.getIndex());
log.info("id: {}",deleteResponse.getId());
log.info("result: {}",deleteResponse.getResult());
}
异步文档删除
@Test
public void testDelete() throws IOException {
//1、创建请求
DeleteRequest request = new DeleteRequest("book","3");
//2、执行
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
log.info("index: {}",deleteResponse.getIndex());
log.info("id: {}",deleteResponse.getId());
log.info("result: {}",deleteResponse.getResult());
}
@Override
public void onFailure(Exception e) {
log.error("error",e);
}
};
client.deleteAsync(request, RequestOptions.DEFAULT,listener);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
批量增删改bulk
@Test
public void testBulk() throws IOException {
//1、创建请求
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("book").id("2").source(XContentType.JSON,"field","1"));
bulkRequest.add(new IndexRequest("book").id("3").source(XContentType.JSON,"field","2"));
bulkRequest.add(new UpdateRequest("book","3").doc(XContentType.JSON,"field","3"));
bulkRequest.add(new DeleteRequest("book").id("2"));
//2、执行
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
//3、获取结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse response = bulkItemResponse.getResponse();
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
log.info("result: {}",response.getResult());
if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.INDEX)){
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
log.info("INDEX result: {}",response.getResult());
}else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.CREATE)){
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
log.info("CREATE result: {}",response.getResult());
}else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.UPDATE)){
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
log.info("UPDATE result: {}",response.getResult());
}else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.DELETE)){
log.info("index: {}",response.getIndex());
log.info("id: {}",response.getId());
log.info("DELETE result: {}",response.getResult());
}
}
}
bulk api奇特的json格式
bulk api的语法
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
[{
"action": {
},
"data": {
}
}]
- 1、bulk中的每个操作都可能要转发到不同的node的shard去执行
- 2、如果采用比较良好的json数组格式
允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理:
- 1)将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象
- 2)解析json数组里的每个json,对每个请求中的document进行路由
- 3)为路由到同一个shard上的多个请求,创建一个请求数组
- 4)将这个请求数组序列化
- 5)将序列化后的请求数组发送到对应的节点上去
3、耗费更多内存,更多的jvm gc开销
我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。
占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降。
另外的话,占用内存更多,就会导致java虚拟机的垃圾回收次数更多,更频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多
现在的奇特格式
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
- 1)不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json。
- 2)对每两个一组的json,读取meta,进行document路由。
- 3)直接将对应的json发送到node上去。
5、最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能。
网友评论