美文网首页ELK stackElasticsearch Elastic Stack
如何提高ElasticSearch 索引速度

如何提高ElasticSearch 索引速度

作者: 祝威廉 | 来源:发表于2016-03-07 23:22 被阅读15091次

我Google了下,大致给出的答案如下:

  1. 使用bulk API
  2. 初次索引的时候,把 replica 设置为 0
  3. 增大 threadpool.index.queue_size
  4. 增大 indices.memory.index_buffer_size
  5. 增大 index.translog.flush_threshold_ops
  6. 增大 index.translog.sync_interval
  7. 增大 index.engine.robin.refresh_interval

这篇文章会讲述上面几个参数的原理,以及一些其他的思路。这些参数大体上是朝着两个方向优化的:

  1. 减少磁盘写入
  2. 增大构建索引处理资源

一般而言,通过第二种方式的需要慎用,会对集群查询功能造成比较大的影响。
这里还有两种形态的解决方案:

  1. 关闭一些特定场景并不需要的功能,比如Translog或者Version等
  2. 将部分计算挪到其他并行计算框架上,比如数据的分片计算等,都可以放到Spark上事先算好

上面的参数都和什么有关

其中 5,6 属于 TransLog 相关。
4 则和Lucene相关
3 则因为ES里大量采用线程池,构建索引的时候,是有单独的线程池做处理的
7 的话个人认为影响不大
2 的话,能够使用上的场景有限。个人认为Replica这块可以使用Kafka的ISR机制。所有数据还是都从Primary写和读。Replica尽量只作为备份数据。

Translog

为什么要有Translog? 因为Translog顺序写日志比构建索引更高效。我们不可能每加一条记录就Commit一次,这样会有大量的文件和磁盘IO产生。但是我们又想避免程序挂掉或者硬件故障而出现数据丢失,所以有了Translog,通常这种日志我们叫做Write Ahead Log。

为了保证数据的完整性,ES默认是每次request结束后都会进行一次sync操作。具体可以查看如下方法:

org.elasticsearch.action.bulk.TransportShardBulkAction.processAfter 

该方法会调用IndexShard.sync 方法进行文件落地。

你也可以通过设置index.translog.durability=async 来完成异步落地。这里的异步其实可能会有一点点误导。前面是每次request结束后都会进行sync,这里的sync仅仅是将Translog落地。而无论你是否设置了async,都会执行如下操作:

根据条件,主要是每隔sync_interval(5s) ,如果flush_threshold_ops(Integer.MAX_VALUE),flush_threshold_size(512m),flush_threshold_period(30m) 满足对应的条件,则进行flush操作,这里除了对Translog进行Commit以外,也对索引进行了Commit。

所以如果你是海量的日志,可以容忍发生故障时丢失一定的数据,那么完全可以设置,index.translog.durability=async,并且将前面提到的flush*相关的参数调大。

而极端情况,你还可以有两个选择:

  1. 设置index.translog.durability=async,接着设置index.translog.disable_flush=true进行禁用定时flush。然后你可以通过应用程序自己手动来控制flush。

  2. 通过改写ES 去掉Translog日志相关的功能

当然,如果去掉Translog日志有两个风险点:

  1. Get最新数据会有点问题。因为根据ID Get最新数据是从Translog里拿的。
  2. 我们知道ES通过Shard Replication 保证Node节点出现故障时出现数据的完整性。在Relocating的时候,Replica 从Primary 进行Recover时,Primary会先Snapshot Lucene,然后拷贝数据到Replica,最后通过回放Translog 保证数据的一致性。

Version

Version可以让ES实现并发修改,但是带来的性能影响也是极大的,这里主要有两块:

  1. 需要访问索引里的版本号,触发磁盘读写
  2. 锁机制

目前而言,似乎没有办法直接关闭Version机制。你可以使用自增长ID并且在构建索引时,index 类型设置为create。这样可以跳过版本检查。

这个场景主要应用于不可变日志导入,随着ES被越来越多的用来做日志分析,日志没有主键ID,所以使用自增ID是合适的,并且不会进行更新,使用一个固定的版本号也是合适的。而不可变日志往往是追求吞吐量。

当然,如果有必要,我们也可以通过改写ES相关代码,禁用版本管理。

分发代理

ES是对索引进行了分片(Shard),然后数据被分发到不同的Shard。这样 查询和构建索引其实都存在一个问题:

如果是构建索引,则需要对数据分拣,然后根据Shard分布分发到不同的Node节点上。
如果是查询,则对外提供的Node需要收集各个Shard的数据做Merge

这都会对对外提供的节点造成较大的压力,从而影响整个bulk/query 的速度。

一个可行的方案是,直接面向客户提供构建索引和查询API的Node节点都采用client模式,不存储数据,可以达到一定的优化效果。

另外一个较为麻烦但似乎会更优的解决方案是,如果你使用类似Spark Streaming这种流式处理程序,在最后往ES输出的时候,可以做如下几件事情:

  1. 获取所有primary shard的信息,并且给所有shard带上一个顺序的数字序号,得到partition(顺序序号) -> shardId的映射关系
  2. 对数据进行repartition,分区后每个partition对应一个shard的数据
  3. 遍历这些partions,写入ES。方法为直接通过RPC 方式,类似transportService.sendRequest 将数据批量发送到对应包含有对应ShardId的Node节点上。

这样有三点好处:

  1. 所有的数据都被直接分到各个Node上直接处理。避免所有的数据先集中到一台服务器
  2. 避免二次分发,减少一次网络IO
  3. 防止最先处理数据的Node压力太大而导致木桶短板效应

场景

因为我正好要做日志分析类的应用,追求高吞吐量,这样上面的三个优化其实都可以做了。一个典型只增不更新的日志入库操作,可以采用如下方案:

  1. 对接Spark Streaming,在Spark里对数据做好分片,直接推送到ES的各个节点
  2. 禁止自动flush操作,每个batch 结束后手动flush。
  3. 避免使用Version

我们可以预期ES会产生多少个新的Segment文件,通过控制batch的周期和大小,预判出ES Segment索引文件的生成大小和Merge情况。最大可能减少ES的一些额外消耗

总结

大体是下面这三个点让es比原生的lucene吞吐量下降了不少:

  1. 为了数据完整性 ES额外添加了WAL(tanslog)
  2. 为了能够并发修改 添加了版本机制
  3. 对外提供服务的node节点存在瓶颈

ES的线性扩展问题主要受限于第三点,
具体描述就是:

如果是构建索引,接受到请求的Node节点需要对数据分拣,然后根据Shard分布分发到不同的Node节点上。
如果是查询,则对外提供的Node需要收集各个Shard的数据做Merge

另外,索引的读写并不需要向Master汇报。

相关文章

  • Elasticsearch 版本升级新增功能和改进

    Elasticsearch 7.10的新增功能和改进 ** 索引速度提高 **Elasticsearch 7.10...

  • [Elasticsearch] 如何提高索引速度

    问题背景 上亿规模的数据做索引按照Elasticsearch默认的配置做索引的速度非常慢,时间成本比较高,如何加快...

  • 如何提高ElasticSearch 索引速度

    我Google了下,大致给出的答案如下: 使用bulk API 初次索引的时候,把 replica 设置为 0 增...

  • 自定义Spark Partitioner提升es-hadoop

    前言 之前写过一篇文章,如何提高ElasticSearch 索引速度。除了对ES本身的优化以外,我现在大体思路是尽...

  • ElasticSearch Bulk 源码解析

    本来应该先有这篇文章,后有如何提高ElasticSearch 索引速度才对。不过当时觉得后面一篇文章会更有实际意义...

  • es优化-翻译

    索引速度优化[https://www.elastic.co/guide/en/elasticsearch/refe...

  • elasticsearch 的基础概念及应用

    目录 === 1、什么是elasticsearch——1、搜索引擎干了什么——2、elasticsearch 如何...

  • mongodb笔记04--索引

    索引: 索引提高查询速度,降低写入速度,权衡常用的查询字段,不必在太多列上建索引 在mongodb中,索引可以按字...

  • MySQL索引

    索引的作用类似指向表中行的指针,能够提高查询速度。尽管索引可以提高查询速度,但是不必要的索引会浪费空间,并且在进行...

  • Mongodb学习笔记 (五) 之 索引

    索引 索引提高查询速度,降低写入速度。权衡常用的查询字段,不必在太多列上建索引 在mongo中,索引可以按字段升序...

网友评论

  • cc0912ba8277:额,小菜鸡弱弱问一句,第一次索引时将副本设置为0.这个得在index中的mapping里的setting中进行设置吧。我也是使用spark streaming进行导入es,而且我的index是在文档导入es中自动创建索引和动态匹配,那么每个自动创建的索引貌似都没有机会去设置mapping的属性。
    所以,想请问一下,如果要在streaming程序中为index设置mapping的话,有哪些做法没有。我目前想单独写一个创建索引和设置mapping的shell脚本之类的。然后把文档往创建好的索引中灌。
    另外,想请教一下,使用es for hadoop中的streaming模块的saveJonsToEs的方法时,动态索引(比如{fileClass}_{dateTimefield:yyyy.MM.dd}/{fieldType}是不是比静态的慢一点。感觉动态索引对于bulk的利用率不是很高啊。
    51d040c52603:可以事先定义个Mapping Template,在写索引时,如果未发现到索引Mapping,则会去找Template,如果template也找不到,那么就用默认的
  • 明翼:请问下有没有关于ES的性能测试报告啊?
  • c04317906e8f:另外关于第三点 “增大 threadpool.index.queue_size” 应该也是没有用的
    索引时的并发量是跟shard的数量对应的,但是不会超过本机的cpu 核的个数。
    因为es里面不管是BULK, 还是INDEX的threadPool,线程数都是fix的,即availableProcessors(貌似可以通过配置手动修改,没设默认就是机器的cpu核数,且不超过32)
    而这个threadpool.index.queue_size,只不过是线程池等待任务队列的大小。默认50,若索引时es消化不过来,这个等待任务超过了队列大小,es会直接拒绝请求,抛出EsRejectException。
  • c04317906e8f:关于version这块,一般是不会影响索引速度的吧。
    一般情况下索引数据时你是不会自己提供id的,这时es会为每条数据自动生成一个base64 UUID,而且好像还是字典序上的自增,这个时候记录索引默认是create,这根本就不存在版本冲突和加锁的问题吧。
    如果你是指索引的meta state的版本号。这个版本号一般只会在发生了field mapping的更新,setting的更新时版本号才会更新。当你海量数据导入的时候,数据的列总不会每条数据都不一样吧?所以这个版本号也是不会频繁更新的。
    不知道我有没有理解正确你的意思?
    祝威廉:@keqinwu 在主分区构建索引,即使采用了自动生成id,以及配置了optimzeAutoGenerate 参数,也不会跳过version的check,也就是一般而言都会有一次到索引获取版本号的动作。这个有人做过实验,主分区的version check 火焰图消耗大概 0.04% ,从分区是0.03。还是有影响的。
  • 209f3f2be57b:Spark Streaming直接用node client就行了不用自己调用内部api那么复杂,直接减少一次代理
    祝威廉:@Leec_ 如果我没理解错NodeClient最后都是通过调用`transportAction.execute(request, listener)` 的方式来完成操作。按此正常情况下分拣会发生在Spark Streaming中。但是这样会发生一个问题: RDD的partition 的NodeClient一次操作基本会和大部分节点建立连接。所以我建议是,事先根据shard规则,将同一shard的数据事先都repartition到同一个partition。这样一个partition只要和一个Node建立连接
  • 209f3f2be57b:增大这个参数的值提高merge速率,会有很大提升默认才20M,会限制写入indices.store.throttle.max_bytes_per_sec=400M

    或者设置indices.store.throttle.type=none
    c04317906e8f:@祝威廉 indices.store.throttle.max_bytes_per_sec确实默认是20M。其实放开限制的话,性能至少提升一倍!最近刚测过。如果SSD的话,能提升好几倍
    你有没有发现往一个索引里面插的时候,越插越慢?主要就是因为这个参数;
    因为索引变大,意味着merge的磁盘读写量更大,20M确实比较保守。
    当然,如果你索引的同时有比较多的查询,这个参数就不要动了,除非你用的是SSD,可以适当调到100M,要不然你会发现你用SSD索引性能居然跟HDD一个鸟样。
    是HDD也还是有的调的。把这个参数“index.merge.scheduler.max_thread_count” 设为1,可以减少磁头争用吧。官方推荐的,应该有点效果吧
    祝威廉:@Leec_ 这种类型的参数调整会对查询造成比较大的影响。
  • yonggang_sun:还有两点,在去年一个高攀的百度工程师分享的:1: 增大索引限制,2: 提高bulk的queue size,前面的我没试过,后面增大bulk的queue size确实有用。
    祝威廉:@yonggang_sun 对 你说的没错 是bulk的那个 对吧
    yonggang_sun:@祝威廉 我觉得是不一样的,bulk或者是index的queue_size,对应的是你调用的是那个类型的api,这是两个不同的参数。
    https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html
    祝威廉:@yonggang_sun bulk queue-size 就是`threadpool.index.queue_size` 这个参数。其实前面google出来的几点只是比较易于实施,如果真的想做到大吞吐量,甚至逼近Lucene原生的能力,则需要做些调整或者改造

本文标题:如何提高ElasticSearch 索引速度

本文链接:https://www.haomeiwen.com/subject/duxzkttx.html