这两周主要看了下 Elasticsearch(其实是Lucene)的 segments 的 merge 流程。事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作,由于我们第一次ES上线,其实也不清楚这究竟是个问题还是本来 Lucene 就是这样,网上找了一些关于ES 或者 Lucene 的 merge 的策略介绍,除了说道大家都了解的一些常规的参数,如最大size,最大doc 下不会再做merge云云,就是没提到到了多少个 segments 之后就不 merge ,接着问了Elasticsearch 圈子的一些人,也没有找到非常确定的答案。查找几天资料无果,顺带就看看源码,最终在昨天又瞄了几眼终于发现一段算法,虽无验证,但应八九不离十,故记录分享之。
Segments
首先还是先重温一下 Lucene 下的 segments,对这个比较陌生的可以阅读三斗大神的这一节
Lucene产生segments示意图我只引用最下面那张图介绍一下,绿色的就是已经固化的一个个的 segments 文件,不会再更新,左下角就是当前在内存的 Lucene 维护的查询可见的仍为持久化的segment,当Elasticsearch 配置的refresh_invterval (默认是1s,可调)到时,这些in in-memory buffer就会推送到OS的文件系统缓存中去,注意这里只是到缓存,很可能OS仍未持久化到文件系统,成为一个单独的 segment 文件,而啥时commit 到文件系统永不丢失,则由Lucene 的flush 机制保证,当Lucene 做完flush 则表明该 segment 真正推送到文件系统,此时才会在translog做标记并可以删除commit之前的translog 了。
注意这里只是一个简化描述,据三斗和赖总介绍,Lucene 仍有很多因素会促使产生一个segment 而不是百分百由Elasticsearch的refresh_interval 决定。这里就不继续讨论究竟在哪些情况会立即生成一个segment了。
Segment 的merge
详细信息可看三斗这一节 这里只引用两个图介绍一下
merge前merge后
从图上看,Lucene每次会选取一些小的segments 进而merge到一个大的segment,我这里不再赘述流程和策略,这里只补充一句就是,如果你之前用的scroll查询,之前的scroll还是会指向老的segments,也就是说老的segments 的引用会知道scroll失效后才会被回收。
Elasticsearch 5.x merge 参数的变化
在老的Elasticsearch 中,merge 被认为是一个非常消耗资源的操作,甚至只有一个线程来做这事,并且会影响indexing的request。在之前的版本里,merge 操作用的是一类 merge throttle limit
这样的配置来限制各种峰值数据,如下面这些参数,注意这些参数都已经在5.x 中移除掉了。
- indices.store.throttle.type
- indices.store.throttle.max_bytes_per_sec
因为在Elasticsearch 5.x 想采用多线程和动态调整这种方式来更加智能地去执行merge操作。如检测是否使用SSD硬盘,应该启动多少个merge线程等。
在Elasticsearch 5.x 下,tired merge policy
成为了唯一的merge策略。因此下面的参数同样也在5.x 下被移除了。
- index.merge.policy.type
- index.merge.policy.min_merge_size
- index.merge.policy.max_merge_size
- index.merge.policy.merge_factor
- index.merge.policy.max_merge_docs
- index.merge.policy.calibrate_size_by_deletes
- index.merge.policy.min_merge_docs
- index.merge.policy.max_merge_docs
如上面说的,ES希望采用一种更智能的方式去调整这些参数,达到一个性能的折中。在5下我们可以配置这些参数:
- index.merge.policy.expunge_deletes_allowed: 指删除了的文档数在一个segment里占的百分比,默认是10,大于这个值时,在执行expungeDeletes 操作时将会merge这些segments.
- index.merge.policy.floor_segment: 官网的解释我没大看懂,我的个人理解是ES会避免产生很小size的segment,小于这个阈值的所有的非常小的segment都会做merge直到达到这个floor 的size,默认是2MB.
- index.merge.policy.max_merge_at_once: 一次最多只操作多少个segments,默认是10.
- index.merge.policy.max_merge_at_once_explicit: 显示调用optimize 操作或者 expungeDeletes时可以操作多少个segments,默认是30.
- index.merge.policy.max_merged_segment: 超过多大size的segment不会再做merge,默认是5g.
- index.merge.policy.segments_per_tier: 每个tier允许的segement 数,注意这个数要大于上面的at_once数,否则这个值会先于最大可操作数到达,就会立刻做merge,这样会造成频繁
- index.reclaim_deletes_weight: 考虑merge的segment 时删除文档数量多少的权重,默认即可.
- index.compund_format: 还不知道干啥用的,默认即可.
merge 线程调整
Elasticsearch 5 采用了多线程去执行merge,可以通过修改index.merge.scheduler.max_thread_count
来动态调整这个线程数,默认的话是通过下面公式去计算:
Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))
要注意的是如果你是用HDD而非SSD的磁盘的话,最好是用单线程为妙。
force merge API
这里有3个参数可以用
- max_num_segments 期望merge到多少个segments,1的意思是强行merge到1个segment
- only_expunge_deletes 只做清理有deleted的segments,即瘦身
- flush 清理完执行一下flush,默认是true
你可以用下面的URL来执行强行的merge
curl -XPOST "http://localhost:9200/library/_forcemerge?max_num_segments=1
代码介绍
merge的代码相对比较少,因为基本上ES并没有做什么东西,只是采用了多线程,和直接调用Lucene的API而已,主要看几个类:
-
MergePolicyConfig.java
专门管理我们输入的那些参数 -
MergeScheduler.java
这个类其实就是封装了一下Lucene的几个调用 -
ConcurrentMergeScheduler.java
继承了MergeScheduler.java
在上面实现了多线程分工并加上了多线程下的一些限制,如上面配置的那些最大XXX,最多XXX 之类的。 -
ElasticsearchConcurrentMergeScheduler.java
继承了ConcurrentMergeScheduler.java
通过配置去控制整个ES实例的merge 流程的运作,还有打印日志等。 -
TieredMergePolicy.java
merge的策略控制类,之前提到了ES5后只剩下这个唯一的默认的策略控制,所有的选择,打分,触发merge的策略都在这里定义。
最后发现问题就出在TieredMergePolicy.java
上,再次回顾我遇到的问题
事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作。
然后我们跳到下面这个方法:
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
<snip> //这里主要拿到总bytes数,segments数,踢掉超过阈值的segments
minSegmentBytes = floorSize(minSegmentBytes);
// Compute max allowed segs in the index
long levelSize = minSegmentBytes;
long bytesLeft = totIndexBytes;
double allowedSegCount = 0;
while(true) {
final double segCountLevel = bytesLeft / (double) levelSize;
if (segCountLevel < segsPerTier) {
allowedSegCount += Math.ceil(segCountLevel);
break;
}
allowedSegCount += segsPerTier;
bytesLeft -= segsPerTier * levelSize;
levelSize *= maxMergeAtOnce;
}
int allowedSegCountInt = (int) allowedSegCount;
<snip>
问题就出在这段算法里了,tieredPolicy,顾名思义就是梯队,阶级,等级,就是把所有的segments 分层一个个的阶级,ES的设想就是 每个tier 都应该至少包含 segsPerTier
个segments,这样从上至下就可以分批的一次每个tier都可以做一轮merge操作,举个例子,如果我们按默认值floor的size是2MB,maxMergeAtOnce
使用默认10 的话,那么最低层就应该有 10 个 2MB的segments,做完merge 应该就会产生一个20MB的 segment,那么这个20MB应该就是下一个tier, 在这个tier里也应该有至少10个20MB的segments 来等待做merge,如此类推。
那我就用我遇到的例子来演绎一遍上面的算法,假设我有5个100MB的大segments,下面就只有少数几个的segments,segsPerTier
和maxMergeAtOnce
都采用默认值10。
- 第一次while,
segCountLevel = 500/2 = 250
显然大于10,所以allowedSegCount = 10
,bytesLeft = 500 - 2*10 = 480
,levelSize = 2*10 = 20
- 第二次while,
segCountLevel = 480/20 = 24
还是大于10,所以allowedSegCount = 10 + 10 = 20
,bytesLeft = 480 - 20*10 = 280
,levelSize = 20*10 = 200
- 第三次while,
segCountLevel = 280/200 = 1.4
小于10 了,所以allowedSegCount = 20 + 2 = 22
; 退出
也就是说这次的merge 操作,根据当前segments总的字节数推算,ES应该是被允许最多merge 22 个segments;接着就是去找实际可以merge的总的eligible的segments数量
// Gather eligible segments for merging, ie segments
// not already being merged and not already picked (by
// prior iteration of this loop) for merging:
final List<SegmentCommitInfo> eligible = new ArrayList<>();
for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
final SegmentCommitInfo info = infosSorted.get(idx);
if (merging.contains(info)) {
mergingBytes += size(info, writer);
} else if (!toBeMerged.contains(info)) {
eligible.add(info);
}
}
final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;
if (verbose(writer)) {
message(" allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, writer);
}
if (eligible.size() == 0) {
return spec;
}
if (eligible.size() > allowedSegCountInt) {
//可以作为预备merge的segments大于允许的数,这轮merge可以做了
//剩下就是为segments打分,选出一定数量的segments来merge
} else {
//达不到预期数量,不做了
}
从上面看到,确实如果segments 不够ES被期望的达到那么多可被merge的segments 数量的时候,其实ES是不做merge的。那么就会在一种场景里面出现:
当索引达到了比较大时,这时经过了一定时间的merge 完成后,segments都会比较大,这时如果indexing的频率相对比较低时,则每轮merge 选择阶段就会得出ES期望这次可以merge的segments数就会比较大,而如果eligible的segments并没有那么大时,则ES就不会进行merge
这就是我得到的结论,还望各ES大神指点是否准确。
网友评论
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
}