在某些业务场景中,索引中的某些文档已经失去意义,我们需要对这部分索引文档进行删除,本来仅仅是java调用一个批量删除的接口就能完成,但是在使用过程中还是遇到一些小问题,在此记录一下自己的处理方法。
需求
在一个检索服务当中,我们可能需要从检索结果中筛选出一部分文档进行删除,在一般操作中,直接调用Java Client的删除接口即可完成索引文档的删除,但在业务中,因为检索文档数的原因,导致部分需要删除的文档未被检索出来,所以当被检索出来的文档被删除时,需要重复执行一次检索,然后再判断是否有文档需要删除。
循环检索删除索引文档的函数:
public List<String> deleteIndexesByString(String str){
boolean deleteFlag = true;
List<String> result = Lists.newArrayList();
while (deleteFlag) {
Optional<List<String>> deletedIndexesIdsOpt = deleteIndexesByString(question);
// 删除中有错误
if (!deletedIndexesOpt.isPresent()) {
log.error("delete indexes by string occurred a error", e);
return Lists.newArrayList();
}
List<String> deletedIndexesIds = deletedIndexesIdsOpt.get();
result.addAll(deletedIndexesIds);
// 如果没有删除索引,则结束循环
if (deletedIndexesIds.size() == 0) {
deleteFlag = false;
}
}
return result;
}
/**
* 检索文档,并计算之后删除部分文档
**/
public Optional<List<String>> deleteIndexesByString(String str){
// 根据句子在elastic search库中检索索引文档
List<Index> indexes =indexService.queryIndexByString(str);
log.debug("query indexes from es:[{}]", indexes);
// 通过重新计算索引文档的分数, 获取要删除的文档Id
List<String> deleteIndexIds=rankService.getDeleteIndexIds(indexes, question);
// 批量删除索引文档
List<BulkItemResponse.Failure> failures = indexService.bulkDeleteIndexDocs(indexName, type, deleteIndexIds);
// 删除没成功
if(!failures.isEmpty()){
return Optional.empty();
}
// 删除成功
return Optional.of(deleteIndexIds);
}
批量删除索引文档的函数:
private List<BulkItemResponse.Failure> bulkDeleteIndexDocs(String indexName, String type, List<String> ids) {
List<BulkItemResponse.Failure> failures = new LinkedList<>();
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
for (String id : ids) {
bulkRequest.add(getClient().prepareDelete(indexName, type, id));
}
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
for (BulkItemResponse single :
bulkResponse.getItems()) {
if (single.isFailed()) {
failures.add(single.getFailure());
}
}
}
return failures;
}
问题
在上述代码逻辑中,运行检索删除时,许多索引文档被重复删除,即bulkDeleteIndexDocs
方法在执行完毕之后,elastic search库中的索引并未同步更新。
解决
在刚开始,以为是bulkDeleteIndexDocs
中的bulkRequest.get();
方法是异步执行的,所以在执行完这个方法之后,循环又执行了许多遍。但是经查文档之后,发现该方法为同步执行的方法, 异步另有其他接口client.bulkAsync(request, RequestOptions.DEFAULT, listener);
。
既是同步执行方法,但是又多遍执行检索删除操作,只能以为批量删除之后,elastic search索引库并未及时更新,多方查阅找不到较好办法来强制更新,所以在执行完批量操作之后暂时使线程睡眠1000ms来等待elastic search索引库更新完成再继续下一次循环,此种方法虽能使检索删除操作次数减少,但仅仅是权益之际,还得另寻方法。
private List<BulkItemResponse.Failure> bulkDeleteIndexDocs(String indexName, String type, List<String> ids) {
List<BulkItemResponse.Failure> failures = new LinkedList<>();
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
for (String id : ids) {
bulkRequest.add(getClient().prepareDelete(indexName, type, id));
}
BulkResponse bulkResponse = bulkRequest.get();
// 进程睡眠1000毫秒,使elastic search索引值更新
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("thread sleep error!");
}
if (bulkResponse.hasFailures()) {
for (BulkItemResponse single :
bulkResponse.getItems()) {
if (single.isFailed()) {
failures.add(single.getFailure());
}
}
}
return failures;
}
网友评论