美文网首页Spring-Boot
elasticsearch之十二springboot测试文档及索

elasticsearch之十二springboot测试文档及索

作者: Java及SpringBoot | 来源:发表于2020-04-01 10:35 被阅读0次

个人专题目录](https://www.jianshu.com/p/140e2a59db2c)


1. elasticsearch文档及索引管理初级

1.1 项目及索引创建

java api 文档 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.5.1/java-rest-overview.html

low : 偏向底层。

high:高级封装。足够。<font color=red>通过API操作与kibana查看操作结果。</font>

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.5.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.5.1</version>
</dependency>
语法:put /index


- title:商品标题
- price:商品价格
- createTime:创建时间
- categoryName:分类名称。如:家电,手机
- brandName:品牌名称。如:华为,小米
- spec: 商品规格。如: spec:{"屏幕尺寸","5寸","内存大小","128G"}
- saleNum:销量
- stock:库存量

PUT book-index
{
    "mappings": {
        "properties": {
            "title": {
                "type": "text",
                "analyzer": "ik_smart"
            },
            "price": { 
                "type": "double"
            },
            "createTime": {
                "type": "date",
                "format" : "yyyy-MM-dd HH:mm:ss"
            },
            "categoryName": {   
                "type": "keyword"
            },
            "brandName": {  
                "type": "keyword"
            },
    
            "spec": {       
                "type": "object"
            },
            "saleNum": {    
                "type": "integer"
            },
            
            "stock": {  
                "type": "integer"
            }
        }
    }
}

PUT /book-index



@Service
@Log4j2
public class IndexServiceImpl implements IndexService {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    public void createIndex(String index, CreateIndexRequest request, boolean async) throws Exception {
        log.info("source:{}", request.toString());
        //操作索引的客户端
        IndicesClient indices = restHighLevelClient.indices();
        CreateIndexResponse response = null;
        //要创建索引,首先我们得先判断索引是不是不存在,如果存在就不创建
        if (!existsIndex(index)) {
            if (async) {
                //异步新增索引
                //监听方法
                ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
                    @Override
                    public void onResponse(CreateIndexResponse createIndexResponse) {
                        log.info("!!!!!!!!创建索引成功" + createIndexResponse.toString());
                    }

                    @Override
                    public void onFailure(Exception e) {
                        log.error("!!!!!!!!创建索引失败", e);
                    }
                };
                //执行创建索引库
                indices.createAsync(request, RequestOptions.DEFAULT, listener);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //专门用于 index 相关的操作
                response = indices.create(request, RequestOptions.DEFAULT);
                //得到响应(全部)
                boolean acknowledged = response.isAcknowledged();
                //得到响应 指示是否在超时前为索引中的每个分片启动了所需数量的碎片副本
                boolean shardsAcknowledged = response.isShardsAcknowledged();
                log.info("创建索引{}的结果是{}", index, response.isAcknowledged());
            }
        } else {
            log.info("索引已经存在{}", index);
        }
    }

    @Override
    public void deleteIndex(String index, boolean async) throws Exception {
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
        IndicesClient indices = restHighLevelClient.indices();
        if (existsIndex(index)) {
            if (async) {
                //异步删除索引库
                //监听方法
                ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
                    @Override
                    public void onResponse(AcknowledgedResponse deleteIndexResponse) {
                        log.info("!!!!!!!!删除索引成功 {}", deleteIndexResponse.toString());
                    }

                    @Override
                    public void onFailure(Exception e) {
                        log.error("!!!!!!!!删除索引失败", e);
                    }
                };
                //执行删除索引
                indices.deleteAsync(deleteIndexRequest, RequestOptions.DEFAULT, listener);

                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //如果index 存在就删除
                //创建用于删除索引的请求
                AcknowledgedResponse response = indices.delete(deleteIndexRequest, RequestOptions.DEFAULT);
                log.info("删除索引{}的结果是{}", index, response.isAcknowledged());
            }

        } else {
            log.info("索引不存在{},无法删除", index);
        }
    }

    @Override
    public boolean existsIndex(String index) throws Exception {
        //设置要查询的索引
        GetIndexRequest getIndexRequest = new GetIndexRequest(index);
        IndicesClient indices = restHighLevelClient.indices();
        //从主节点返回本地信息或检索状态
        getIndexRequest.local(false);
        //以适合人类的格式返回结果
        getIndexRequest.humanReadable(true);
        //是否返回每个索引的所有默认设置
        getIndexRequest.includeDefaults(false);
        boolean exists = indices.exists(getIndexRequest, RequestOptions.DEFAULT);
        log.info("索引{}存在的状态是{}", index, exists);
        return exists;
    }


    @Override
    public void openIndex(String indexName) throws IOException {
        OpenIndexRequest request = new OpenIndexRequest(indexName);
        IndicesClient indices = restHighLevelClient.indices();
        OpenIndexResponse openIndexResponse = indices.open(request, RequestOptions.DEFAULT);
        boolean acknowledged = openIndexResponse.isAcknowledged();
        log.info("!!!!!!!!!" + acknowledged);
    }

    @Override
    public void closeIndex(String indexName) throws IOException {
        CloseIndexRequest request = new CloseIndexRequest(indexName);
        CloseIndexResponse closeIndexResponse = restHighLevelClient.indices().close(request, RequestOptions.DEFAULT);
        boolean acknowledged = closeIndexResponse.isAcknowledged();
        log.info("!!!!!!!!!" + acknowledged);
    }
}

1.2 索引操作示例

@SpringBootTest(classes = SearchServiceApplication.class)
@WebAppConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class IndexServiceTest {

    @Autowired
    private IndexService indexService;

    @Test
    public void testCreateIndex() throws Exception {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(Constants.INDEX_NAME);
        //我们创建index 和 type 的 时候需要指定分配和 mapping
        buildingSetting(createIndexRequest);
        buildingMapping(createIndexRequest);

        //设置别名
        //createIndexRequest.alias(new Alias("alias_index_name"));

        // 额外参数
        //设置超时时间
        createIndexRequest.setTimeout(TimeValue.timeValueMinutes(2));
        //设置主节点超时时间
        createIndexRequest.setMasterTimeout(TimeValue.timeValueMinutes(1));
        //在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示
        createIndexRequest.waitForActiveShards(ActiveShardCount.from(2));
        createIndexRequest.waitForActiveShards(ActiveShardCount.DEFAULT);

        indexService.createIndex(Constants.INDEX_NAME, createIndexRequest, false);
    }

    /**
     * - title:商品标题
     * - price:商品价格
     * - createTime:创建时间
     * - categoryName:分类名称。如:家电,手机
     * - brandName:品牌名称。如:华为,小米
     * - spec: 商品规格。如: spec:{"屏幕尺寸","5寸","内存大小","128G"}
     * - saleNum:销量
     * - stock:库存量
     */
    private void buildingMapping(CreateIndexRequest createIndexRequest) throws IOException {
        XContentBuilder xContentBuilder = JsonXContent.contentBuilder()
                .startObject()
                .startObject("properties")

                .startObject("title")
                .field("type", "text")
                .field("analyzer", "ik_smart")
                .endObject()

                .startObject("price")
                .field("type", "double")
                .endObject()

                .startObject("createTime")
                .field("type", "date")
                .field("format", "yyyy-MM-dd HH:mm:ss")
                .endObject()

                .startObject("categoryName")
                .field("type", "keyword")
                .endObject()

                .startObject("brandName")
                .field("type", "keyword")
                .endObject()

                .startObject("spec")
                .field("type", "object")
                .endObject()

                .startObject("saleNum")
                .field("type", "integer")
                .endObject()

                .startObject("stock")
                .field("type", "integer")
                .endObject()

                .endObject()
                .endObject();
        createIndexRequest.mapping(xContentBuilder);

    }

    /**
     * 设置 index 的分片规则
     *
     * @param createIndexRequest
     */
    private void buildingSetting(CreateIndexRequest createIndexRequest) {
        createIndexRequest.settings(Settings.builder()
                // 设置主分片为 3
                .put("number_of_shards", 3)
                //设置从分片为 2
                .put("number_of_replicas", 2));
    }

    @Test
    public void testDeleteIndex() throws Exception {
        indexService.deleteIndex(Constants.INDEX_NAME, false);
    }

    @Test
    public void testExistsIndex() throws Exception {
        indexService.existsIndex(Constants.INDEX_NAME);
    }

    @Test
    public void testOpenIndex() throws IOException {
        indexService.openIndex(Constants.INDEX_NAME);
    }

    @Test
    public void testCloseIndex() throws IOException {
        indexService.closeIndex(Constants.INDEX_NAME);
    }
}

1.3 查询文档

语法:GET /index/type/id

查看:GET /book-index/ 就可看到json形式的文档。方便程序解析。

_mget批量查询

批量查询可以提高查询效率。推荐使用(相对于单数据查询来说)。

@Override
public void get(String index, String id, boolean async) throws Exception {
    //构建请求
    GetRequest getRequest = new GetRequest(index, id);

    //可选参数
    //为特定字段配置_source_include
    String[] includes = new String[]{"id", "price"};
    String[] excludes = Strings.EMPTY_ARRAY;
    FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
    getRequest.fetchSourceContext(fetchSourceContext);

    //设置路由
    //getRequest.routing("id");

    if (async) {
        // 执行 查询 同步查询
        GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        // 获取结果
        if (getResponse.isExists()) {
            long version = getResponse.getVersion();
            //检索文档(String形式)
            String sourceAsString = getResponse.getSourceAsString();
            log.info(sourceAsString);
            //以字节接受
            byte[] sourceAsBytes = getResponse.getSourceAsBytes();
            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
            log.info(sourceAsMap);
        }
    } else {
        //异步查询
        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
            //查询成功时的立马执行的方法
            @Override
            public void onResponse(GetResponse getResponse) {
                long version = getResponse.getVersion();
                //检索文档(String形式)
                String sourceAsString = getResponse.getSourceAsString();
                log.info(sourceAsString);
            }

            //查询失败时的立马执行的方法
            @Override
            public void onFailure(Exception e) {
                e.printStackTrace();
            }
        };
        //执行异步请求
        restHighLevelClient.getAsync(getRequest, RequestOptions.DEFAULT, listener);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void mGet(List<BulkBean> beans) throws Exception {
    MultiGetRequest multiGetRequest = new MultiGetRequest();
    for (BulkBean bean : beans) {
        multiGetRequest.add(bean.getIndex(), bean.getId());
    }
    MultiGetResponse multiGetResponse = restHighLevelClient.mget(multiGetRequest, RequestOptions.DEFAULT);
    //获取响应
    MultiGetItemResponse[] responses = multiGetResponse.getResponses();
    for (MultiGetItemResponse response : responses) {
        //将数据以 map 格式展示
        Map<String, Object> map = response.getResponse().getSource();
        //将数据以 json 格式展示
        String source = response.getResponse().getSourceAsString();
        log.info("每一条数据是{}" + source);
    }
    log.info(multiGetResponse);
}

1.3 新建文档

语法:PUT /index/_doc/id

为防止覆盖原有数据,我们在新增时,设置为强制创建,不会覆盖原有文档。

语法:PUT /index/ _doc/id/_create

使用强制新增语法时,如果Document的id在Elasticsearch中已存在,则会报错。(version conflict, document already exists)

# 此操作为Elasticsearch自动生成id的新增Document方式。
POST book-index/_doc/1
{
  "title":"小米手机",
  "price":1000,
  "createTime":"2019-12-01",
  "categoryName":"手机",
  "brandName":"小米",
  "saleNum":3000,
  "stock":10000,
  "spec":{
    "网络制式":"移动4G",
    "屏幕尺寸":"4.5"
  }
}
#指定id创建文档,需要 PUT 请求
PUT /book-index/1
{

}
public void addDoc(String index, String json, String docId) throws IOException {
    IndexRequest indexRequest = new IndexRequest(index);
    //设置我们要传递的数据
    indexRequest.source(json, XContentType.JSON);
    if (docId != null) {
        indexRequest.id(docId);
    }
    //可选参数
    //设置超时时间
    indexRequest.timeout(TimeValue.timeValueSeconds(1));
    indexRequest.timeout("1s");

    //自己维护版本号
    //indexRequest.version(2);
    //indexRequest.versionType(VersionType.EXTERNAL);

    IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    log.info("添加数据indexResponse {}" + objectMapper.writeValueAsString(indexResponse));

    //构建方法2
//        XContentBuilder builder = XContentFactory.jsonBuilder();
//        builder.startObject();
//        {
//            builder.field("user", "tomas");
//            builder.timeField("postDate", new Date());
//            builder.field("message", "trying out es2");
//        }
//        builder.endObject();
//        indexRequest.source(builder);
//        //异步
//        ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
//            @Override
//            public void onResponse(IndexResponse indexResponse) {
//
//            }
//
//            @Override
//            public void onFailure(Exception e) {
//
//            }
//        };
//        client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);
//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }

    String indexName = indexResponse.getIndex();
    String id = indexResponse.getId();
    //获取插入的类型
    if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
        DocWriteResponse.Result result = indexResponse.getResult();
        log.info("CREATED:" + result);
    } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        DocWriteResponse.Result result = indexResponse.getResult();
        log.info("UPDATED:" + result);
    }

    ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
    if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
        log.info("处理成功的分片数少于总分片!");
    }
    if (shardInfo.getFailed() > 0) {
        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
            //处理潜在的失败原因
            String reason = failure.reason();
            log.info(reason);
        }
    }
}

1.4 更新文档

PUT /index/type/1 替换操作是整体覆盖,要带上所有信息。

执行两次,返回结果中版本号(_version)在不断上升。此过程为全量替换。

实质:旧文档的内容不会立即删除,只是标记为deleted。适当的时机,集群会将这些文档删除。

局部替换 partial update

POST方式更新单个内容

语法:POST /{index}/type /{id}/_update

或者POST /{index}/_update/{id}

partial update局部替换则只修改变动字段。

内部与全量替换是一样的,旧文档标记为删除,新建一个文档。

优点:

  • 大大减少网络传输次数和流量,提升性能
  • 减少并发冲突发生的概率。
POST /book-index/1/_update
{
  "doc":{
  "ipAddr":"10.126.2.9"
  }
}
public UpdateResponse update(String index, String type, Map<String, Object> values, String id) throws IOException {
    //创建更新请求,并指定 index,type 和 id  局部更新部分数据
    UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(values);

    //可选参数
    //超时时间
    updateRequest.timeout("1s");

    //重试次数
    updateRequest.retryOnConflict(3);
    //设置在继续更新之前,必须激活的分片数
    updateRequest.waitForActiveShards(2);
    //所有分片都是active状态,才更新
    updateRequest.waitForActiveShards(ActiveShardCount.ALL);
    UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
    updateResponse.getId();
    updateResponse.getIndex();
    //判断结果
    if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
        DocWriteResponse.Result result = updateResponse.getResult();
        System.out.println("CREATED:" + result);
    } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
        DocWriteResponse.Result result = updateResponse.getResult();
        System.out.println("UPDATED:" + result);
    } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        DocWriteResponse.Result result = updateResponse.getResult();
        System.out.println("DELETED:" + result);
    } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
        //没有操作
        DocWriteResponse.Result result = updateResponse.getResult();
        System.out.println("NOOP:" + result);
    }
    return updateResponse;
}

1.5 删除文档

Elasticsearch中执行删除操作时,Elasticsearch先标记Document为deleted状态,而不是直接物理删除。当Elasticsearch存储空间不足或工作空闲时,才会执行物理删除操作。标记为deleted状态的数据不会被查询搜索到。

DELETE /book-index/1
public void deleteDocById(String index, String id) throws Exception {
    DeleteRequest deleteRequest = new DeleteRequest(index, id);

    DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
    deleteResponse.getId();
    deleteResponse.getIndex();
    DocWriteResponse.Result result = deleteResponse.getResult();
    log.info(result);
    log.info("删除状态是{}", deleteResponse.status().getStatus());
}

1.6 批量操作bulk

注意:bulk语法中要求一个完整的json串不能有换行。不同的json串必须使用换行分隔。多个操作中,如果有错误情况,不会影响到其他的操作,只会在批量操作返回结果中标记失败。bulk语法批量操作时,bulk request会一次性加载到内存中,如果请求数据量太大,性能反而下降(内存压力过高),需要反复尝试一个最佳的bulk request size。一般从1000~5000条数据开始尝试,逐渐增加。如果查看bulk request size的话,一般是5~15MB之间为好。

bulk语法要求json格式是为了对内存的方便管理,和尽可能降低内存的压力。如果json格式没有特殊的限制,Elasticsearch在解释bulk请求时,需要对任意格式的json进行解释处理,需要对bulk请求数据做json对象会json array对象的转化,那么内存的占用量至少翻倍,当请求量过大的时候,对内存的压力会直线上升,且需要jvm gc进程对垃圾数据做频繁回收,影响Elasticsearch效率。

生产环境中,bulk api常用。都是使用java代码实现循环操作。一般一次bulk请求,执行一种操作。如:批量新增10000条数据等。

POST /_bulk
{"action": {"metadata"}}
{"data"}


#bulk 批量添加,批量的时候第一行为id 列,第二行为数据列,中间不能出现换行
POST /book-index/_doc/_bulk
{"index":{"_id":1}}
{"corpName":"途虎养车",...}
{"index":{"_id":2}}
{"corpName":"盒马鲜生"...}


#可以删除不同 index 下的数据,下面案例不演示了,注意会在倒数第二行报错,因为第一条就是删除的它,即便在地址中指定了库,可以去删除其他index 的数据,在参数中不指定 index 的情况下就是按照地址中的来,指定了 index 的情况下就是按照具体指定的来
POST /lib2/books/_bulk
{"delete":{"_index":"lib2","_type":"books","_id":4}}
{"create":{"_index":"tt","_type":"ttt","_id":"100"}}
{"name":"lisi"}
{"index":{"_index":"tt","_type":"ttt"}}
{"name":"zhaosi"}
{"update":{"_index":"lib2","_type":"books","_id":"4"}}
{"doc":{"price":58}}


#可以指定不同的 index 和 type
GET /_mget
{
"docs":[
   {
       "_index": "book-index", #索引
       "_type": "_doc", #数据类型
       "_id": 1 #要查询的主键
   },
     {
       "_index": "book-index", 
       "_type": "_doc",
       "_id": 2
   }
 ]
}

action:(行为)

create:文档不存在时创建

update:更新文档

index:创建新文档或替换已有文档

delete:删除一个文档

metadata:_index,_type,_id

create 和index的区别

如果数据存在,使用create操作失败,会提示文档已经存在,使用index则可以成功执行。

public void bulkOption(List<BulkBeanWithOption> beanWithOptionList) throws IOException {
    BulkRequest bulkRequest = new BulkRequest();

    for (BulkBeanWithOption bean : beanWithOptionList) {
        switch (bean.getBulkOption()) {
            //根据我们的操作类型来决定做什么
            case INDEX:
                IndexRequest indexRequest = new IndexRequest(bean.getIndex());
                indexRequest.id(bean.getId());
                indexRequest.source(bean.getJson(), XContentType.JSON);
                bulkRequest.add(indexRequest);
                break;
            case CREATE:
                break;
            case DELETE:
                DeleteRequest deleteRequest = new DeleteRequest(bean.getIndex());
                deleteRequest.id(bean.getId());
                bulkRequest.add(deleteRequest);
                break;
            case UPDATE:
                log.info("update");
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + bean.getBulkOption());
        }
    }
    BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

    for (BulkItemResponse itemResponse : bulkResponse) {
        DocWriteResponse itemResponseResponse = itemResponse.getResponse();

        switch (itemResponse.getOpType()) {
            case INDEX:
            case CREATE:
                IndexResponse indexResponse = (IndexResponse) itemResponseResponse;
                indexResponse.getId();
                log.info(indexResponse.getResult());
                break;
            case UPDATE:
                UpdateResponse updateResponse = (UpdateResponse) itemResponseResponse;
                updateResponse.getIndex();
                log.info(updateResponse.getResult());
                break;
            case DELETE:
                DeleteResponse deleteResponse = (DeleteResponse) itemResponseResponse;
                log.info(deleteResponse.getResult());
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + itemResponse.getOpType());
        }
    }
}

1.7 索引refresh

一个理想的搜索解决方案中,新索引的数据应该能立即搜索到。ElasticSearch给人的第一印象仿佛就是如此工作的,即使是在多服务器环境下,然而事实并非如此(至少不是任何场景都能保证新索引的数据能被实时检索到)。

elasticsearch是基于lucene的,lucene是可以做到实时的,就是创建索引之后,立即能查询到。但是这样,要么是牺牲索引的效率,每次都索引之后都刷新,要么就是牺牲查询的效率每次查询之前都进行刷新。

无论哪一种,都会让你的性能下降10倍以上,所以只能采取一种折中的方案,每隔n秒自动刷新,这样你创建索引之后,最多在ns之内肯定能查到。这就是所谓的准实时(near real-time)查询。

elasticsearch默认刷新时间是1s。

刷新索引方法:

public void refreshIndex(String indexName) throws IOException {
    RefreshRequest refreshRequest = new RefreshRequest(indexName);
    IndicesClient indices = restHighLevelClient.indices();
    RefreshResponse refresh = indices.refresh(refreshRequest, RequestOptions.DEFAULT);
    log.info(refresh.toString());
}

相关文章

网友评论

    本文标题:elasticsearch之十二springboot测试文档及索

    本文链接:https://www.haomeiwen.com/subject/mdjxuhtx.html