Elasticsearch 5.x 源码分析(5)segment

作者: 华安火车迷 | 来源:发表于2017-06-16 14:56 被阅读2803次

    这两周主要看了下 Elasticsearch(其实是Lucene)的 segments 的 merge 流程。事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作,由于我们第一次ES上线,其实也不清楚这究竟是个问题还是本来 Lucene 就是这样,网上找了一些关于ES 或者 Lucene 的 merge 的策略介绍,除了说道大家都了解的一些常规的参数,如最大size,最大doc 下不会再做merge云云,就是没提到到了多少个 segments 之后就不 merge ,接着问了Elasticsearch 圈子的一些人,也没有找到非常确定的答案。查找几天资料无果,顺带就看看源码,最终在昨天又瞄了几眼终于发现一段算法,虽无验证,但应八九不离十,故记录分享之。


    Segments

    首先还是先重温一下 Lucene 下的 segments,对这个比较陌生的可以阅读三斗大神的这一节

    Lucene产生segments示意图

    我只引用最下面那张图介绍一下,绿色的就是已经固化的一个个的 segments 文件,不会再更新,左下角就是当前在内存的 Lucene 维护的查询可见的仍为持久化的segment,当Elasticsearch 配置的refresh_invterval (默认是1s,可调)到时,这些in in-memory buffer就会推送到OS的文件系统缓存中去,注意这里只是到缓存,很可能OS仍未持久化到文件系统,成为一个单独的 segment 文件,而啥时commit 到文件系统永不丢失,则由Lucene 的flush 机制保证,当Lucene 做完flush 则表明该 segment 真正推送到文件系统,此时才会在translog做标记并可以删除commit之前的translog 了。

    注意这里只是一个简化描述,据三斗和赖总介绍,Lucene 仍有很多因素会促使产生一个segment 而不是百分百由Elasticsearch的refresh_interval 决定。这里就不继续讨论究竟在哪些情况会立即生成一个segment了。

    Segment 的merge

    详细信息可看三斗这一节 这里只引用两个图介绍一下

    merge前
    merge后

    从图上看,Lucene每次会选取一些小的segments 进而merge到一个大的segment,我这里不再赘述流程和策略,这里只补充一句就是,如果你之前用的scroll查询,之前的scroll还是会指向老的segments,也就是说老的segments 的引用会知道scroll失效后才会被回收。


    Elasticsearch 5.x merge 参数的变化

    在老的Elasticsearch 中,merge 被认为是一个非常消耗资源的操作,甚至只有一个线程来做这事,并且会影响indexing的request。在之前的版本里,merge 操作用的是一类 merge throttle limit这样的配置来限制各种峰值数据,如下面这些参数,注意这些参数都已经在5.x 中移除掉了。

    • indices.store.throttle.type
    • indices.store.throttle.max_bytes_per_sec

    因为在Elasticsearch 5.x 想采用多线程和动态调整这种方式来更加智能地去执行merge操作。如检测是否使用SSD硬盘,应该启动多少个merge线程等。
    在Elasticsearch 5.x 下,tired merge policy 成为了唯一的merge策略。因此下面的参数同样也在5.x 下被移除了。

    • index.merge.policy.type
    • index.merge.policy.min_merge_size
    • index.merge.policy.max_merge_size
    • index.merge.policy.merge_factor
    • index.merge.policy.max_merge_docs
    • index.merge.policy.calibrate_size_by_deletes
    • index.merge.policy.min_merge_docs
    • index.merge.policy.max_merge_docs

    如上面说的,ES希望采用一种更智能的方式去调整这些参数,达到一个性能的折中。在5下我们可以配置这些参数:

    • index.merge.policy.expunge_deletes_allowed: 指删除了的文档数在一个segment里占的百分比,默认是10,大于这个值时,在执行expungeDeletes 操作时将会merge这些segments.
    • index.merge.policy.floor_segment: 官网的解释我没大看懂,我的个人理解是ES会避免产生很小size的segment,小于这个阈值的所有的非常小的segment都会做merge直到达到这个floor 的size,默认是2MB.
    • index.merge.policy.max_merge_at_once: 一次最多只操作多少个segments,默认是10.
    • index.merge.policy.max_merge_at_once_explicit: 显示调用optimize 操作或者 expungeDeletes时可以操作多少个segments,默认是30.
    • index.merge.policy.max_merged_segment: 超过多大size的segment不会再做merge,默认是5g.
    • index.merge.policy.segments_per_tier: 每个tier允许的segement 数,注意这个数要大于上面的at_once数,否则这个值会先于最大可操作数到达,就会立刻做merge,这样会造成频繁
    • index.reclaim_deletes_weight: 考虑merge的segment 时删除文档数量多少的权重,默认即可.
    • index.compund_format: 还不知道干啥用的,默认即可.

    merge 线程调整

    Elasticsearch 5 采用了多线程去执行merge,可以通过修改index.merge.scheduler.max_thread_count 来动态调整这个线程数,默认的话是通过下面公式去计算:

    Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2)) 
    

    要注意的是如果你是用HDD而非SSD的磁盘的话,最好是用单线程为妙。


    force merge API

    这里有3个参数可以用

    • max_num_segments 期望merge到多少个segments,1的意思是强行merge到1个segment
    • only_expunge_deletes 只做清理有deleted的segments,即瘦身
    • flush 清理完执行一下flush,默认是true

    你可以用下面的URL来执行强行的merge

    curl -XPOST "http://localhost:9200/library/_forcemerge?max_num_segments=1
    

    代码介绍

    merge的代码相对比较少,因为基本上ES并没有做什么东西,只是采用了多线程,和直接调用Lucene的API而已,主要看几个类:

    • MergePolicyConfig.java 专门管理我们输入的那些参数
    • MergeScheduler.java 这个类其实就是封装了一下Lucene的几个调用
    • ConcurrentMergeScheduler.java 继承了MergeScheduler.java在上面实现了多线程分工并加上了多线程下的一些限制,如上面配置的那些最大XXX,最多XXX 之类的。
    • ElasticsearchConcurrentMergeScheduler.java 继承了ConcurrentMergeScheduler.java 通过配置去控制整个ES实例的merge 流程的运作,还有打印日志等。
    • TieredMergePolicy.java merge的策略控制类,之前提到了ES5后只剩下这个唯一的默认的策略控制,所有的选择,打分,触发merge的策略都在这里定义。

    最后发现问题就出在TieredMergePolicy.java 上,再次回顾我遇到的问题

    事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作。

    然后我们跳到下面这个方法:

    public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
    
    <snip> //这里主要拿到总bytes数,segments数,踢掉超过阈值的segments
    
        minSegmentBytes = floorSize(minSegmentBytes);
        // Compute max allowed segs in the index
        long levelSize = minSegmentBytes;
        long bytesLeft = totIndexBytes;
        double allowedSegCount = 0;
        while(true) {
          final double segCountLevel = bytesLeft / (double) levelSize;
          if (segCountLevel < segsPerTier) {
            allowedSegCount += Math.ceil(segCountLevel);
            break;
          }
          allowedSegCount += segsPerTier;
          bytesLeft -= segsPerTier * levelSize;
          levelSize *= maxMergeAtOnce;
        }
        int allowedSegCountInt = (int) allowedSegCount;
    
    <snip>
    

    问题就出在这段算法里了,tieredPolicy,顾名思义就是梯队,阶级,等级,就是把所有的segments 分层一个个的阶级,ES的设想就是 每个tier 都应该至少包含 segsPerTier 个segments,这样从上至下就可以分批的一次每个tier都可以做一轮merge操作,举个例子,如果我们按默认值floor的size是2MB,maxMergeAtOnce 使用默认10 的话,那么最低层就应该有 10 个 2MB的segments,做完merge 应该就会产生一个20MB的 segment,那么这个20MB应该就是下一个tier, 在这个tier里也应该有至少10个20MB的segments 来等待做merge,如此类推。
    那我就用我遇到的例子来演绎一遍上面的算法,假设我有5个100MB的大segments,下面就只有少数几个的segments,segsPerTiermaxMergeAtOnce 都采用默认值10。

    1. 第一次while,segCountLevel = 500/2 = 250 显然大于10,所以allowedSegCount = 10bytesLeft = 500 - 2*10 = 480levelSize = 2*10 = 20
    2. 第二次while,segCountLevel = 480/20 = 24 还是大于10,所以allowedSegCount = 10 + 10 = 20bytesLeft = 480 - 20*10 = 280levelSize = 20*10 = 200
    3. 第三次while,segCountLevel = 280/200 = 1.4 小于10 了,所以allowedSegCount = 20 + 2 = 22; 退出

    也就是说这次的merge 操作,根据当前segments总的字节数推算,ES应该是被允许最多merge 22 个segments;接着就是去找实际可以merge的总的eligible的segments数量

    // Gather eligible segments for merging, ie segments
         // not already being merged and not already picked (by
         // prior iteration of this loop) for merging:
         final List<SegmentCommitInfo> eligible = new ArrayList<>();
         for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
           final SegmentCommitInfo info = infosSorted.get(idx);
           if (merging.contains(info)) {
             mergingBytes += size(info, writer);
           } else if (!toBeMerged.contains(info)) {
             eligible.add(info);
           }
         }
    
         final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;
    
         if (verbose(writer)) {
           message("  allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, writer);
         }
    
         if (eligible.size() == 0) {
           return spec;
         }
    
         if (eligible.size() > allowedSegCountInt) {
          //可以作为预备merge的segments大于允许的数,这轮merge可以做了
          //剩下就是为segments打分,选出一定数量的segments来merge
         } else {
          //达不到预期数量,不做了
         }
    

    从上面看到,确实如果segments 不够ES被期望的达到那么多可被merge的segments 数量的时候,其实ES是不做merge的。那么就会在一种场景里面出现:

    当索引达到了比较大时,这时经过了一定时间的merge 完成后,segments都会比较大,这时如果indexing的频率相对比较低时,则每轮merge 选择阶段就会得出ES期望这次可以merge的segments数就会比较大,而如果eligible的segments并没有那么大时,则ES就不会进行merge

    这就是我得到的结论,还望各ES大神指点是否准确。

    相关文章

      网友评论

      • 达微:非常有用,但是还是不知道怎么着手,我业务主要涉及到聚合而且是1到3亿数据的term聚合,不强制合并段性能总是达不到3秒响应的要求。我可以怎样调参可以使得段的总量一直很少,比如一个index段数量控制在100以内
        华安火车迷:@c7e63930e97b 任何东西都是折中的,你能把refresh 加大,比如3分钟5分钟,然后启动定时器在闲时不断做forcemerge
      • 数据产品人生:大神,我想咨询一下,merge的频率怎么可以调节呢?我的索引有很多segment了,但是还是没有进行merge
        华安火车迷:@做只安静的猴子 merge会一直进行的,可以通过调节merge线程来控制,不过默认一般就可以了,你是不是refresh 的时间调太短了,比如1s,这样每秒都会产生1个segments
      • NormanCao:大神,我这里有一个问题请教一下,我腾讯云服务器下,32G内存8核,单机启了两个elasticsearch 实例,在一个集群里。有一个100G左右大小的索引,5分片1副本,根据关键字检索多个字段的时候速度特别慢,20s~60s,也不稳定。尝试了新建了一个索引,10分片,1副本,然后reindex之后,查询速度并没有效果,搞了两三天了,实在头疼,请问有什么能指点一下的不?
        华安火车迷:@NormanCao 这个问题有很多原因引起的,要逐步逐步分析才行,你可以在你的请求里开一下profile:true 看一下你的查询究竟慢在了哪一步, 然后再对症下药吧
      • 49f24dae36e2:最近遇到个问题segment 每秒生产上百个而且都是10KB,导致我更新很慢(不停的生产10KB合并segment ),我的集群内存32G ,Heap Mem 16G很充足,就是系统总共32G内存使用了%90,系统内存满了会影响segment 创建和合并吗,ES保护机制?我用的ES5.4
        641f42fe675c:@华安火车迷 bulkRequest 之后,不会refresh。源码见BulkRequest.java

        public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
        private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
        }
        华安火车迷:你可以看看这篇文章 https://www.outcoldman.com/en/archive/2017/07/13/elasticsearch-explaining-merge-settings/
        华安火车迷:@khalil_ed6f 不会,应该是你index 是refresh时间设置很短吧,可以调大点,比如5s,另外你的indexing 的request 有没有设置自动refresh ? 我记得好像比如bulkRequest 它默认是提交会自动refresh一下的,这样后台其实触发了Lucene的commit了
      • 水欣:打开了,想问下每次reflush都会产生一个新文件,而这个新文件就是segment吗
        水欣:@华安火车迷 这样的话bulk提交一次就会refresh一次,那岂不是要产生很多小的segment?这些segment会做merge吗?做merge个过程中会涉及到磁盘的segment吗?感觉我都要看蒙了
        水欣:@华安火车迷 最近项目中遇到es 集群io wait太高,在解决这个问题,可以加我QQ吗,494815400,有些问题想要请教下
        华安火车迷:@水欣_99ae 你是指refresh吧? 是的refresh 就是调用了Lucene的reopen,会产生新的segment的
      • 水欣:那个三斗大神的连接打不开
        华安火车迷:翻墙吧

      本文标题:Elasticsearch 5.x 源码分析(5)segment

      本文链接:https://www.haomeiwen.com/subject/nsppqxtx.html