美文网首页晓我课堂
Elasticsearch在操作数据后不能立即查询到数据

Elasticsearch在操作数据后不能立即查询到数据

作者: 阿格里 | 来源:发表于2021-10-28 17:33 被阅读0次

    问题

    一次在使用Elasticsearch来存储数据的时候,前端在把数据保存成功后,调用查询列表的接口来刷新列表时,发现有时能刷新出新保存的数据,有时候又不能,修改和删除数据也发现有类似现象。但是再次刷新页面的时候又可以查询到新操作的数据。

    后面发现这个问题是Elasticsearch本身在操作数据后有一定的延迟性导致。Elasticsearch默认情况下在写入数据后,是要等1s后才能被查询到。

    因为Elasticsearch中每次索引refresh会产生一个新的 lucene 段,这会导致频繁的 segment merge 行为,对系统 CPU 和 IO 占用都比较高。

    解决方法1

    需要修改ElasticSearch的刷新策略

    org.elasticsearch.action.support.WriteRequest.RefreshPolicy 中定义了三种刷新策略:

    enum RefreshPolicy implements Writeable {
        /**
         * Don't refresh after this request. The default.
         */
        NONE("false"),
        /**
         * Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful
         * to present a consistent view to for indices with very low traffic. And it is wonderful for tests!
         */
        IMMEDIATE("true"),
        /**
         * Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is
         * compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.
         */
        WAIT_UNTIL("wait_for");
    

    IMMEDIATE

    • 请求向ElasticSearch提交了数据,立即刷新数据后,再结束请求。
    • 优点:操作延时短、实时性高
    • 缺点:资源消耗高

    WAIT_UNTIL

    • 请求向ElasticSearch提交了数据,等待数据完成刷新(默认为1s),然后再结束请求。
    • 优点:实时性高、操作延时长
    • 缺点:资源消耗低

    NONE (默认策略)

    • 请求向ElasticSearch提交了数据,不等待数据完成刷新,直接结束请求。
    • 优点:操作延时短、资源消耗低
    • 缺点:实时性低

    org.elasticsearch.action.support.WriteRequestBuilder中的setRefreshPolicy方法,默认为设置为RefreshPolicy#NONE

    /**
     * Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
     * {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).
     */
    @SuppressWarnings("unchecked")
    default B setRefreshPolicy(RefreshPolicy refreshPolicy) {
        request().setRefreshPolicy(refreshPolicy);
        return (B) this;
    }
    

    代码中设置 setRefreshPolicy

    setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) 设置为立即刷新,即可解决不能立即查询到数据的问题。

    // 删除 设置为WriteRequest.RefreshPolicy.IMMEDIATE 立即刷新
    DeleteRequestBuilder deleteRequestBuilder = transportClient.prepareDelete(INDEX, TYPE, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); 
    // 新增
    IndexRequestBuilder indexRequestBuilder = transportClient.prepareIndex(INDEX, TYPE, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
    // 更新
    UpdateRequestBuilder updateRequestBuilder = transportClient.prepareUpdate(INDEX, TYPE,id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
    // 批量
    BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
    

    支持的接口:

    • 删除:DeleteRequestBuilder
    • 新增:IndexRequestBuilder
    • 更新:UpdateRequestBuilder
    • 批量:BulkRequestBuilder

    解决方法2

    在查询结果前使用sleep来暂停一段时间再执行查询操作,这样可以不改变ElasticSearch的刷新策略。

    当业务需要频繁的操作修改数据的情况下,设置刷新策略为立即刷新的话可能会造成性能问题。

    我们可以使用sleep的方式不去修改默认的刷新策略。

    当我们在执行新增、修改、删除、批量操作的时候,在这些操作中设置一把锁,过期时间为1s(这个时间刚好是刷新策略的默认时间),在查询数据时我们需要先去检查是否有锁,如果有锁存在我们就等待500ms后再执行查询操作。这样基本上又可以保证不改刷新策略,也可以保存查询的实时性。

    /**
     * 在新增、修改、删除、批量操作的时候设置锁
     */
    private void setLock() {
      redisService.setWithExpire(getEsParentUpdateKey(), UPDATING_MSG, 1);
    }
    
    /**
     * 查询redis中的锁是否存在,如果有锁,将暂停500毫秒,再执行查询操作
     */
    private void waitMe() {
        Object lock = redisService.get(getEsParentUpdateKey());
        if (lock != null) {
            try {
                //为了得到ES更新后的数据,暂停500s
                TimeUnit.MICROSECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:Elasticsearch在操作数据后不能立即查询到数据

        本文链接:https://www.haomeiwen.com/subject/zqbvaltx.html