美文网首页
ElasticSearch bulk request 数据丢失问

ElasticSearch bulk request 数据丢失问

作者: NazgulSun | 来源:发表于2021-05-17 15:53 被阅读0次

问题描述

  • 使用 highlevel rest api 的异步 bulkUpdate 加快index 的速度
  • 文档有3000万记录,写入集群后用 head 查看,count 少了不少,多次实验,结果是随机的
  • 由于使用异步写入,在异步回调的response 中,查看并没有异常
  • 由于id 都是唯一的,并不存在更新覆盖的问题
  • 使用同步API,写入集群,count 数目 完美对应。

所以问题还是出在异步写入上,并且client 没有收到任何集群反馈的异常。
使用同步写入速度又太慢。
同样的程序在本地测试集群上,可以正常跑完,考虑到是 prd 集群的配置问题,所以解决的策略基本是降低 异步并发 和控制流量。

尝试的方法:

降低 http pool 的并发io 和 连接数
maxConcurrency  = 8
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback()
        {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder)
            {
                httpClientBuilder.setMaxConnPerRoute(maxConcurrency * 2);

                int maxIO = Runtime.getRuntime().availableProcessors() * 2 > maxConcurrency ?
                        maxConcurrency : Runtime.getRuntime().availableProcessors() * 2;

                IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
                        .setIoThreadCount(maxIO)
                        .build();
                httpClientBuilder.setDefaultIOReactorConfig(ioReactorConfig);
                return httpClientBuilder;
            }
        });

通过 jstack 看到 并发 的IO dispatch 的确减少了,最多8个,但是依旧出现丢数据问题。

扩大index 的 partiion 数

最开始是 1 shard,1个 replics, 调整为5个 shard 和 1个 replics
测试下来,还是出现丢数据的问题。

控制发送速度

考虑到应该是写入的速度,远远大于 集群消费的速度, 所以 在生产端进行了流量控制。
使用semphore:100 控制 in-progress 的 request 最多为100个, 如果 超过100 就需要等待。

在 bulkRequest 的回调中,对信号量 进行 release。
经过客户端的流量控制, 在异步情况下,能够完好的完成index建立, 速度也没下降。

总结

对于ES 的写入优化,异步是一个很好的手段,但是一定要注意生产和消费的速度问题,
通常来说,生产者速度是非常快的,而消费者由于集群配置,应用负载,网络等问题,无法匹配生产者要求的速度,
从而造成很多意想不到的问题。

异步虽然性能高,但是一定要量力而行。

相关文章

网友评论

      本文标题:ElasticSearch bulk request 数据丢失问

      本文链接:https://www.haomeiwen.com/subject/rupjjltx.html