背景:
需要实现根据某个条件更新es里面的一批数据,如果一条条查出来然后去更新的话就效率比较低。
实现:
ES接口调用
POST http://localhost:9200/article/_update_by_query
{
"script": {
"source":"ctx._source['status']=0;"
},
"query": {
"term": {
"userId": 1
}
}
}
Spring data
环境:jdk8
语言:kotlin
框架:spring boot 2.4.5
写了个服务,可以用注入的方式调用方法。方法的入参包括过滤条件和要修改的值。
@Component
class EsSearchService(
private val esClient: RestHighLevelClient
) {
fun updateByQuery(status: Int, userId: String): BulkByScrollResponse {
val termQuery = TermQueryBuilder("userId", userId)
val script = Script(
ScriptType.INLINE,
"painless",
"ctx._source['status'] = params['status'];",
mapOf("status" to status)
)
val request = UpdateByQueryRequest("article")
request.setScript(script).setQuery(termQuery)
request.maxRetries = 10
request.isAbortOnVersionConflict = true
return esClient.updateByQuery(request , RequestOptions.DEFAULT)
}
}
后续
去GitHub上查看spring-data-elasticsearch
的源码,发现4.2.x
的版本用updateByQuery更加方便了。
官方给出的例子:
https://github.com/spring-projects/spring-data-elasticsearch/blob/4.2.x/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java
void shouldDoUpdateByQueryForExistingDocument() {
// given
final String documentId = nextIdAsString();
final String messageBeforeUpdate = "some test message";
final String messageAfterUpdate = "test message";
final SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message(messageBeforeUpdate)
.version(System.currentTimeMillis()).build();
final IndexQuery indexQuery = getIndexQuery(sampleEntity);
operations.index(indexQuery, index);
indexOperations.refresh();
final NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
final UpdateQuery updateQuery = UpdateQuery.builder(query)
.withScriptType(org.springframework.data.elasticsearch.core.ScriptType.INLINE)
.withScript("ctx._source['message'] = params['newMessage']").withLang("painless")
.withParams(Collections.singletonMap("newMessage", messageAfterUpdate)).withAbortOnVersionConflict(true)
.build();
// when
operations.updateByQuery(updateQuery, index);
// then
SampleEntity indexedEntity = operations.get(documentId, SampleEntity.class, index);
assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate);
}
可惜想用的话需要升级spring boot版本,项目依赖太多了,没有升级成功,以下是对应的版本表。官方文档
网友评论