(因文章超出最大长度限制,这篇接Lucene Term Query删除流程源码剖析-上,属于对上篇准备数据之后的处理)
5 刷盘时对删除的处理
至此,已经介绍了删除的一些基础内容,下面就看下flush时,Lucene时如何处理删除操作的。
5.1 刷盘入口函数分析
因为IndexWriter.commit
最终也会调用IndexWriter.flush
,所以这里只介绍IndexWriter.flush
。
//IndexWriter
/** Moves all in-memory segments to the {@link Directory}, but does not commit
* (fsync) them (call {@link #commit} for that). */
public final void flush() throws IOException {
flush(true, true);
}
/**
* Flush all in-memory buffered updates (adds and deletes)
* to the Directory.
* @param triggerMerge if true, we may merge segments (if
* deletes or docs were flushed) if necessary
* @param applyAllDeletes whether pending deletes should also
*/
final void flush(boolean triggerMerge, boolean applyAllDeletes) throws IOException {
// NOTE: this method cannot be sync'd because
// maybeMerge() in turn calls mergeScheduler.merge which
// in turn can take a long time to run and we don't want
// to hold the lock for that. In the case of
// ConcurrentMergeScheduler this can lead to deadlock
// when it stalls due to too many running merges.
// We can be called during close, when closing==true, so we must pass false to ensureOpen:
//保证该索引是打开状态
ensureOpen(false);
//最终调用doFlush(applyAllDeletes),之后会调用maybeMerge,也就是可能会触发一次段合并
//不是本文重点,不会展开介绍
if (doFlush(applyAllDeletes) && triggerMerge) {
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
/** Returns true a segment was flushed or deletes were applied. */
private boolean doFlush(boolean applyAllDeletes) throws IOException {
...
doBeforeFlush();
testPoint("startDoFlush");
boolean success = false;
try {
...
boolean anyChanges = false;
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
//调用DocumentsWriter.flushAllThreads刷新所有的DWPT
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;
} else {
anyChanges = false;
}
if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
//刷新之后的所有DWPT都会返回一个FlushedSegments放入ticketQueue中
//这里就执行ticketQueue的purge操作
publishFlushedSegments(true);
flushSuccess = true;
} finally {
assert holdsFullFlushLock();
docWriter.finishFullFlush(flushSuccess);
processEvents(false);
}
}
if (applyAllDeletes) {
applyAllDeletesAndUpdates();
}
anyChanges |= maybeMerge.getAndSet(false);
synchronized(this) {
writeReaderPool(applyAllDeletes);
doAfterFlush();
success = true;
return anyChanges;
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "doFlush");
throw tragedy;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during flush");
}
maybeCloseOnTragicEvent();
}
}
}
从上面可以看出,刷新的第一步是调用DocumentsWriter.flushAllThreads
刷新所有的DWPT,前面说了Term删除属于局部、确定的删除,如果需要刷盘的doc能够匹配该term删除条件,就会被过滤掉。下面看下DocumentsWriter.flushAllThreads
的具体逻辑
5.2 DWPT刷盘:Term删除处理
DWPT刷盘时会拿自己的pendingUpdates中deleteTerms记录的term删除匹配所有的文档,如果匹配上则不会写入硬盘,所以此阶段不需要生成liv文件,因为刷到硬盘里的都是存活的,其实这里说的也不全对,“刷到硬盘里的都是存活的”是对于pendingUpdates里的deleteTerms删除来说的,因为有可能这些存活的文档能匹配query删除条件,也需要被删除。对于query删除的处理是在所有DWPT刷盘完成之后进行的,那时会生成liv文件,标识满足query删除的文档是dead的。
这里还存在第4节说的意外情况,如果pendingUpdates.deleteDocIDs中有因更新文档异常记录的需要删除的docID,则也会生成liv文件。
这里先建立这样的一个全局刷新观念,下面看一下DocumentsWriter.flushAllThreads
函数:
//DocumentsWriter
/*
* FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
long flushAllThreads()
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
...
long seqNo;
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
assert currentFullFlushDelQueue != null;
assert currentFullFlushDelQueue != deleteQueue;
boolean anythingFlushed = false;
try {
DocumentsWriterPerThread flushingDWPT;
// Help out with flushing:
//这里依次刷新所有的DWPT
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
// If a concurrent flush is still in flight wait for it
//因为有可能刷新时并发的,这里等待所有DWPT刷新完成
flushControl.waitForFlush();
if (anythingFlushed == false && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
assertTicketQueueModification(flushingDeleteQueue);
//将globalQueue中的变更仿佛ticketQueue中,等待被purge
ticketQueue.addDeletes(flushingDeleteQueue);
}
// we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
// concurrently if we have very small ram buffers this happens quite frequently
assert !flushingDeleteQueue.anyChanges();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
}
if (anythingFlushed) {
return -seqNo;
} else {
return seqNo;
}
}
下面看DWPT刷新操作:
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false;
while (flushingDWPT != null) {
hasEvents = true;
boolean success = false;
DocumentsWriterFlushQueue.FlushTicket ticket = null;
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
+ currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
+ " " + flushControl.isFullFlush();
/*
* Since with DWPT the flush process is concurrent and several DWPT
* could flush at the same time we must maintain the order of the
* flushes before we can apply the flushed segment and the frozen global
* deletes it is buffering. The reason for this is that the global
* deletes mark a certain point in time where we took a DWPT out of
* rotation and freeze the global deletes.
*
* Example: A flush 'A' starts and freezes the global deletes, then
* flush 'B' starts and freezes all deletes occurred since 'A' has
* started. if 'B' finishes before 'A' we need to wait until 'A' is done
* otherwise the deletes frozen by 'B' are not applied to 'A' and we
* might miss to deletes documents in 'A'.
*/
try {
assert assertTicketQueueModification(flushingDWPT.deleteQueue);
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
//每个DWPT刷新都会从tickerQueue获取一个ticker,用于保证顺序
ticket = ticketQueue.addFlushTicket(flushingDWPT);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
// flush concurrently without locking
//调用DWPT的刷新flush操作
final FlushedSegment newSegment = flushingDWPT.flush(flushNotifications);
//将DWPT.flush返回的FlushedSegment放到刚拿到的ticket位置上
ticketQueue.addSegment(ticket, newSegment);
dwptSuccess = true;
} finally {
subtractFlushedNumDocs(flushingDocsInRam);
if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
//可以看下下面flushNotifications的定义,这里将需要删除的文件封装为Event放入eventQueue中
Set<String> files = flushingDWPT.pendingFilesToDelete();
flushNotifications.deleteUnusedFiles(files);
hasEvents = true;
}
if (dwptSuccess == false) {
flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
hasEvents = true;
}
}
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
if (!success && ticket != null) {
// In the case of a failure make sure we are making progress and
// apply all the deletes since the segment flush failed since the flush
// ticket could hold global deletes see FlushTicket#canPublish()
ticketQueue.markTicketFailed(ticket);
}
}
/*
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) {
// This means there is a backlog: the one
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers.
flushNotifications.onTicketBacklog();
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
}
//当前DWPT处理完毕之后,会获取下一个DWPT进行刷新,这里感觉像工作密取模式
//因为这里有一个while循环,如果本DWPT完成后处理下一个DWPT,在flushAllThreads
//其实也是一个while循环从flushControl获取DWPT依次进行flush
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
flushNotifications.afterSegmentsFlushed();
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
hasEvents = true;
if (applyAllDeletes(deleteQueue) == false) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed()/(1024.*1024.),
ramBufferSizeMB));
}
flushNotifications.onDeletesApplied();
}
}
return hasEvents;
}
上面代码中出现了flushNotifications
,因为DWPT的flush操作有可能是并发的,熟悉异步操作的读者可以知道,异步操作一般都会传入一个callback,在异步操作完成、发生异常时等会调用callback,flushNotifications
就相当于callback。
flushNotifications
是DocumentsWriter
的一个成员变量,其实是在IndexWriter
中实例化的,通过DocumentsWriter
构造函数传入的。
flushNotifications
在IndexWriter
定义如下:
//从上面可以看到,主要就是将一些刷新过程中的操作定义成Event放入IndexWriter维护的eventQueue中。
private final DocumentsWriter.FlushNotifications flushNotifications = new DocumentsWriter.FlushNotifications() {
@Override
public void deleteUnusedFiles(Collection<String> files) {
eventQueue.add(w -> w.deleteNewFiles(files));
}
@Override
public void flushFailed(SegmentInfo info) {
eventQueue.add(w -> w.flushFailed(info));
}
@Override
public void afterSegmentsFlushed() throws IOException {
try {
publishFlushedSegments(false);
} finally {
if (false) {
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
}
@Override
public void onTragicEvent(Throwable event, String message) {
IndexWriter.this.onTragicEvent(event, message);
}
@Override
public void onDeletesApplied() {
eventQueue.add(w -> {
try {
w.publishFlushedSegments(true);
} finally {
flushCount.incrementAndGet();
}
}
);
}
@Override
public void onTicketBacklog() {
eventQueue.add(w -> w.publishFlushedSegments(true));
}
};
DWPT刷盘之前,会调用ticket = ticketQueue.addFlushTicket(flushingDWPT)
获取一个ticket, ticketQueue在第2节已经介绍过,可继续看下面的过程加深对ticketQueue的理解:
//DocumentsWriterFlushQueue
synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
boolean success = false;
try {
// prepare flush freezes the global deletes - do in synced block!
//dwpt.prepareFlush会把当前globalSlice中截取的globalQueue放入
//globalBufferedQueue中,并对其进行冻结,返回冻结之后的FrozenBufferedUpdates实例
//FlushTicket.frozenUpdates保存的就是刚冻结的全局globalBufferedUpdates.
//这里有个需要注意的点,如果flush过程中没有Term删除和Query删除
//的发生,那么后续的DWPT刷新时就不会产生冻结的globalBufferedUpdates,
//因为此时globalSlice首尾指针都已经指向了globalQueue的尾节点
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}
FrozenBufferedUpdates prepareFlush() throws IOException {
assert numDocsInRAM > 0;
//冻结globalBufferedUpdates过程中也会将当前DWPT的deleteSlice尾指针指向当前globalQueue尾节点
//以便后续deleteSlice.appy
final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice);
/* deleteSlice can possibly be null if we have hit non-aborting exceptions during indexing and never succeeded
adding a document. */
if (deleteSlice != null) {
// apply all deletes before we flush and release the delete slice
//将当前DWPT deleteSlice截取的globalQueue中操作放入其pendingUpdates中
deleteSlice.apply(pendingUpdates, numDocsInRAM);
assert deleteSlice.isEmpty();
deleteSlice.reset();
}
return globalUpdates;
}
//DocumentsWriterFlushQueue
FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) throws IOException {
globalBufferLock.lock();
/*
* Here we freeze the global buffer so we need to lock it, apply all
* deletes in the queue and reset the global slice to let the GC prune the
* queue.
*/
final Node<?> currentTail = tail; // take the current tail make this local any
// Changes after this call are applied later
// and not relevant here
if (callerSlice != null) {
// Update the callers slices so we are on the same page
//这里也会将当前刷新的DWPT的deleteSlice尾指针更新为globalQueue
//最新尾节点,等待deleteSlice后续apply
callerSlice.sliceTail = currentTail;
}
try {
if (globalSlice.sliceTail != currentTail) {
globalSlice.sliceTail = currentTail;
//将globalSlice首尾节点截取的globalQueue更新放入globalBufferedUpdates中
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
if (globalBufferedUpdates.any()) {
//冻结globalBufferedUpdates
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
//清空globalBufferedUpdates
globalBufferedUpdates.clear();
return packet;
} else {
return null;
}
} finally {
globalBufferLock.unlock();
}
}
//DocumentsWriterFlushQueue.FlushTicket
//FlushTicket是DocumentsWriterFlushQueue的内部类,封装了DWPT刷新之后的段视图FlushedSegment以及该DWPT pendingUpdates冻结版本frozenUpdates
static final class FlushTicket {
private final FrozenBufferedUpdates frozenUpdates;
private final boolean hasSegment;
private FlushedSegment segment;
private boolean failed = false;
private boolean published = false;
FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) {
this.frozenUpdates = frozenUpdates;
this.hasSegment = hasSegment;
}
boolean canPublish() {
return hasSegment == false || segment != null || failed;
}
synchronized void markPublished() {
assert published == false: "ticket was already published - can not publish twice";
published = true;
}
private void setSegment(FlushedSegment segment) {
assert !failed;
this.segment = segment;
}
private void setFailed() {
assert segment == null;
failed = true;
}
/**
* Returns the flushed segment or <code>null</code> if this flush ticket doesn't have a segment. This can be the
* case if this ticket represents a flushed global frozen updates package.
*/
FlushedSegment getFlushedSegment() {
return segment;
}
/**
* Returns a frozen global deletes package.
*/
FrozenBufferedUpdates getFrozenUpdates() {
return frozenUpdates;
}
}
}
下面看DWPT.flush函数实现:
//DocumentsWriterPerThread
/** Flush all pending docs to a new segment */
FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert numDocsInRAM > 0;
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM);
//首先将本地维护的一些信息封装为一个SegmentWriteState对象进行处理
//本地维护的信息包括:sementInfo、pendingUpdates等
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = bytesUsed() / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
// doc, eg if analyzer has some problem w/ the text):
//处理pendingUpdates维护的deleteDocIDs,本文一开始介绍了pendingUpdates中维护的三大类更新操作,
//deleteDocIDs主要维护更新异常之后需要删除的doc,这里可以直接在flushState.liveDocs标识其出来
//如果deleteDocIDs不为空,则此DWPT刷盘会生成liv文件
if (pendingUpdates.deleteDocIDs.size() > 0) {
flushState.liveDocs = new FixedBitSet(numDocsInRAM);
flushState.liveDocs.set(0, numDocsInRAM);
for(int delDocID : pendingUpdates.deleteDocIDs) {
flushState.liveDocs.clear(delDocID);
}
flushState.delCountOnFlush = pendingUpdates.deleteDocIDs.size();
pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
//pendingUpdates.deleteDocIDs已经处理了,即转化为liveDocs中标志为了,所有此处将其清空
pendingUpdates.deleteDocIDs.clear();
}
if (aborted) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: skip because aborting is set");
}
return null;
}
long t0 = System.nanoTime();
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
}
final Sorter.DocMap sortMap;
try {
DocIdSetIterator softDeletedDocs;
if (indexWriterConfig.getSoftDeletesField() != null) {
softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField());
} else {
softDeletedDocs = null;
}
//进行实际的刷盘操作
sortMap = consumer.flush(flushState);
if (softDeletedDocs == null) {
flushState.softDelCountOnFlush = 0;
} else {
flushState.softDelCountOnFlush = PendingSoftDeletes.countSoftDeletes(softDeletedDocs, flushState.liveDocs);
assert flushState.segmentInfo.maxDoc() >= flushState.softDelCountOnFlush + flushState.delCountOnFlush;
}
// We clear this here because we already resolved them (private to this segment) when writing postings:
//因为在刷盘时已经处理了term删除,所以这里可以将其清除
pendingUpdates.clearDeleteTerms();
//将此DWPT维护的segmentInfo文件更新为此次刷新新创建的文件
segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, flushState.softDelCountOnFlush, -1L, -1L, -1L);
...
final BufferedUpdates segmentDeletes;
if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
pendingUpdates.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingUpdates;
}
...
assert segmentInfo != null;
//这里类似于为此DWPT刷新之后状态建立一个视图,返回给DocumentsWriter
FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush, sortMap);
sealFlushedSegment(fs, sortMap, flushNotifications);
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
}
return fs;
} catch (Throwable t) {
onAbortingException(t);
throw t;
} finally {
maybeAbort("flush", flushNotifications);
}
}
上面的consumer.flush(flushState)
负责进行实际的刷盘动作,默认的consumer为DefaultIndexingChain
,下面列出其flush函数代码,其实这里已经超出了本文标题应该涵盖的内容,因为DWPT.flush已经处理了term删除,
在consumer.flush(flushState)
会进行各个索引要素的刷盘动作,如nvd,dvd,dim,dii,最后需要注意的是liv文件会通过位图标识出被删除的doc,但是其他文件中该文档的相关信息并没有被删除。
@Override
public Sorter.DocMap flush(SegmentWriteState state) throws IOException {
// NOTE: caller (DocumentsWriterPerThread) handles
// aborting on any exception from this method
Sorter.DocMap sortMap = maybeSortSegment(state);
int maxDoc = state.segmentInfo.maxDoc();
long t0 = System.nanoTime();
writeNorms(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write norms");
}
t0 = System.nanoTime();
writeDocValues(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write docValues");
}
t0 = System.nanoTime();
writePoints(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write points");
}
// it's possible all docs hit non-aborting exceptions...
t0 = System.nanoTime();
storedFieldsConsumer.finish(maxDoc);
storedFieldsConsumer.flush(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to finish stored fields");
}
t0 = System.nanoTime();
Map<String,TermsHashPerField> fieldsToFlush = new HashMap<>();
for (int i=0;i<fieldHash.length;i++) {
PerField perField = fieldHash[i];
while (perField != null) {
if (perField.invertState != null) {
fieldsToFlush.put(perField.fieldInfo.name, perField.termsHashPerField);
}
perField = perField.next;
}
}
termsHash.flush(fieldsToFlush, state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write postings and finish vectors");
}
// Important to save after asking consumer to flush so
// consumer can alter the FieldInfo* if necessary. EG,
// FreqProxTermsWriter does this with
// FieldInfo.storePayload.
t0 = System.nanoTime();
docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos");
}
return sortMap;
}
到这里,DWPT刷盘已经完成,并向DocumentsWriter
返回一个FlushedSegment
实例。从上面的doFlush(DocumentsWriterPerThread flushingDWPT)
代码可以看出,此时使用DWPT刷盘前的ticket,设置其FlushedSegment
,即ticketQueue.addSegment(ticket, newSegment)
5.3 所有DWPT刷盘之后处理Query删除
如果所有的DWPT都完成了刷盘操作,DocumentsWriter.doFlush(DocumentsWriterPerThread flushingDWPT)
会调用flushNotifications.afterSegmentsFlushed()
,从flushNotifications定义可以知道afterSegmentsFlushed
操作如下:
//IndexWriter.new DocumentsWriter.FlushNotifications
@Override
public void afterSegmentsFlushed() throws IOException {
try {
publishFlushedSegments(false);
} finally {
if (false) {
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
}
//forced为false,表示尝试purge ticketQueue中的记录
void publishFlushedSegments(boolean forced) throws IOException {
docWriter.purgeFlushTickets(forced, ticket -> {
DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
//这里的获取的冻结的updates其实就是第一个刷新的DWPT冻结的全局
//globalBufferedUpdates,见上面的ticketQueue.addFlushTicket(flushingDWPT);函数注释
FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
ticket.markPublished();
//如果该ticket没有FlushedSegment,则表示是DocumentsWriter.flushAllThreads在flush完所有DWPT后
//使用ticketQueue.addDeletes(flushingDeleteQueue)放入的globalQueue
if (newSegment == null) { // this is a flushed global deletes package - not a segments
if (bufferedUpdates != null && bufferedUpdates.any()) { // TODO why can this be null?
publishFrozenUpdates(bufferedUpdates);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush: push buffered updates: " + bufferedUpdates);
}
}
} else {
//每个DWPT刷盘之后放入的FlushedSegment
assert newSegment.segmentInfo != null;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment seg-private updates=" + newSegment.segmentUpdates);
}
if (newSegment.segmentUpdates != null && infoStream.isEnabled("DW")) {
infoStream.message("IW", "flush: push buffered seg private updates: " + newSegment.segmentUpdates);
}
// now publish!
//bufferedUpdates是冻结的全局globalBufferedUpdates
//newSegment.segmentUpdates才是DWPT刷新时自己的pendingUpdates
publishFlushedSegment(newSegment.segmentInfo, newSegment.fieldInfos, newSegment.segmentUpdates,
bufferedUpdates, newSegment.sortMap);
}
});
}
//DocumentsWriter
void purgeFlushTickets(boolean forced, IOUtils.IOConsumer<DocumentsWriterFlushQueue.FlushTicket> consumer)
throws IOException {
if (forced) {
ticketQueue.forcePurge(consumer);
} else {
ticketQueue.tryPurge(consumer);
}
}
//DocumentsWriterFlushQueue
void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
if (purgeLock.tryLock()) {
try {
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
}
private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
while (true) {
final FlushTicket head;
final boolean canPublish;
synchronized (this) {
head = queue.peek();
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
try {
/*
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since there could
* be a ticket still in the queue.
*/
//consumer为IndexWriter.publishFlushedSegments传入的参数
consumer.accept(head);
} finally {
synchronized (this) {
// finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
decTickets();
// we hold the purgeLock so no other thread should have polled:
assert poll == head;
}
}
} else {
break;
}
}
}
这里首先看IndexWriter.publishFlushedSegment
函数
//IndexWriter
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer.
*/
private synchronized void publishFlushedSegment(SegmentCommitInfo newSegment, FieldInfos fieldInfos,
FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
Sorter.DocMap sortMap) throws IOException {
boolean published = false;
try {
// Lock order IW -> BDS
ensureOpen(false);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment " + newSegment);
}
if (globalPacket != null && globalPacket.any()) {
//这里处理DWPT刷新时冻结的globalBufferedUpdates
publishFrozenUpdates(globalPacket);
}
// Publishing the segment must be sync'd on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
//这里处理DWPT自身的pendingUpdates
nextGen = publishFrozenUpdates(packet);
} else {
// Since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedUpdatesStream.getNextGen();
// No deletes/updates here, so marked finished immediately:
bufferedUpdatesStream.finishedSegment(nextGen);
}
//这里其实有个问题,根据上面的代码发现,即处理了冻结的全局globalBufferedUpdates
//又处理了DWPT吱声的pendingUpdates,而二者之间是有同步关系的,
//会不会造成处理两次?
//其实通过源码实际调试发现,因为全局删除和局部删除之间的同步
//DWPT初始化之后,globalBufferedUpdates中新增的删除操作
//确实也会在pendingUpdates中出现。
//避免对相同的操作执行两次删除的关键就在于这里的deleteGen。
//首先IndexWriter在其成员bufferedUpdatesStream中维护了一个全局的delGen,称为nextGen
//每次调用publishFrozenUpdates都会调用bufferedUpdatesStream.push递增该nextGen
//并返回之前的nextGen(类似于return nextGen++)
//除此之外,每个BufferedUpdates也有个delGen,该delGen也在
//bufferedUpdatesStream.push设置,设置为push中递增之前的nextGen
//假设当前bufferedUpdatesStream.nextGen = 1(也是源码中的默认值)
//publishFrozenUpdates(globalPacket);(即publish全局变更)之后,globalPakcet.delGen=1,bufferedUpdatesStream.nextGen = 2;
//nextGen = publishFrozenUpdates(packet)(即publish DWPT变更)之后,packet.delGen = 2,bufferedUpdatesStream.nextGen = 3; 但是这里publishFrozenUpdates(packet)返回值等于递增之前的值,即2,nextGen=2;
//下面语句会设置当前待刷新段的delGen为2
//在实际(FrozenBufferedUpdates)pakcet.apply(IndexWriter writer)中调用openSegmentStates时,会调用openSegmentStates
//里面有个判断info.getBufferedDeletesGen() <= delGen
//info.getBufferedDeletesGen() 返回的就是下面语句设置newSegment的delGen,小于等于右边的delGen就是packet的delGen
//所以根据上面举例,globalPakcet的delGen肯定要小于info.getBufferedDeletesGen,所以在既有globalUpdates,又有pendingUpdates时,只有pendingUpdates才能打开相应的segment并执行。
//这样就避免了因为同步造成的两次执行。
//如果该DWPT没有pendingUpdates,则上面else里执行nextGen = bufferedUpdatesStream.getNextGen();
//这样也会令globalPakcet的delGen肯定要小于info.getBufferedDeletesGen,避免了globalUpdates的执行
//根据上面的注释也发现了globalBufferedUpdates优先级要小于pendingUpdates
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
published = true;
checkpoint();
if (packet != null && packet.any() && sortMap != null) {
// TODO: not great we do this heavyish op while holding IW's monitor lock,
// but it only applies if you are using sorted indices and updating doc values:
ReadersAndUpdates rld = getPooledInstance(newSegment, true);
rld.sortMap = sortMap;
// DON't release this ReadersAndUpdates we need to stick with that sortMap
}
FieldInfo fieldInfo = fieldInfos.fieldInfo(config.softDeletesField); // will return null if no soft deletes are present
// this is a corner case where documents delete them-self with soft deletes. This is used to
// build delete tombstones etc. in this case we haven't seen any updates to the DV in this fresh flushed segment.
// if we have seen updates the update code checks if the segment is fully deleted.
boolean hasInitialSoftDeleted = (fieldInfo != null
&& fieldInfo.getDocValuesGen() == -1
&& fieldInfo.getDocValuesType() != DocValuesType.NONE);
final boolean isFullyHardDeleted = newSegment.getDelCount() == newSegment.info.maxDoc();
// we either have a fully hard-deleted segment or one or more docs are soft-deleted. In both cases we need
// to go and check if they are fully deleted. This has the nice side-effect that we now have accurate numbers
// for the soft delete right after we flushed to disk.
if (hasInitialSoftDeleted || isFullyHardDeleted){
// this operation is only really executed if needed an if soft-deletes are not configured it only be executed
// if we deleted all docs in this newly flushed segment.
ReadersAndUpdates rld = getPooledInstance(newSegment, true);
try {
if (isFullyDeleted(rld)) {
dropDeletedSegment(newSegment);
checkpoint();
}
} finally {
release(rld);
}
}
} finally {
if (published == false) {
adjustPendingNumDocs(-newSegment.info.maxDoc());
}
flushCount.incrementAndGet();
doAfterFlush();
}
}
synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
assert packet != null && packet.any();
long nextGen = bufferedUpdatesStream.push(packet);
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
//向eventQueue中放入一个Event,该event调用了FrozenBufferedUpdates.apply(IndexWriter)方法。
eventQueue.add(w -> {
try {
packet.apply(w);
} catch (Throwable t) {
try {
w.onTragicEvent(t, "applyUpdatesPacket");
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
w.flushDeletesCount.incrementAndGet();
});
return nextGen;
}
下面看IndexWriter.eventQueue
的相关处理,eventQueue里放入的Event会在IndexWriter.doFlush
结束docWriter.flushAllThreads()
之后调用processEvents(false)
进行处理:
//IndexWriter
private void processEvents(boolean triggerMerge) throws IOException {
if (tragedy.get() == null) {
Event event;
while ((event = eventQueue.poll()) != null) {
event.process(this);
}
}
if (triggerMerge) {
maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
我们直接看刚放入调用FrozenBufferedUpdates.apply(IndexWriter)
的事件,即FrozenBufferedUpdates.apply(IndexWriter)
函数:
//ForzenBufferedUpdates
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads. */
@SuppressWarnings("try")
public synchronized void apply(IndexWriter writer) throws IOException {
if (applied.getCount() == 0) {
// already done
return;
}
long startNS = System.nanoTime();
assert any();
Set<SegmentCommitInfo> seenSegments = new HashSet<>();
int iter = 0;
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
while (true) {
String messagePrefix;
if (iter == 0) {
messagePrefix = "";
} else {
messagePrefix = "iter " + iter;
}
long iterStartNS = System.nanoTime();
long mergeGenStart = writer.mergeFinishedGen.get();
Set<String> delFiles = new HashSet<>();
BufferedUpdatesStream.SegmentState[] segStates;
synchronized (writer) {
//取得IndexWriter里维护的SegmentInfos
List<SegmentCommitInfo> infos = getInfosToApply(writer);
if (infos == null) {
break;
}
for (SegmentCommitInfo info : infos) {
delFiles.addAll(info.files());
}
// Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers
//每个SegmentInfo都生成一个SegmentState,这里涉及到ReadersAndUpdates
//ReadersAndUpdates负责对内存中SegmentInfo应用删除、更新操作
segStates = openSegmentStates(writer, infos, seenSegments, delGen());
if (segStates.length == 0) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "packet matches no segments");
}
break;
}
...
totalSegmentCount += segStates.length;
// Important, else IFD may try to delete our files while we are still using them,
// if e.g. a merge finishes on some of the segments we are resolving on:
writer.deleter.incRef(delFiles);
}
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
//调用ForzenBufferedUpdates.apply(BufferedUpdatesStream.SegmentState[] segStates)函数
delCount = apply(segStates);
success.set(true);
}
// Since we just resolved some more deletes/updates, now is a good time to write them:
//因为上面对内存中segment进行了更新,所以这里将更新的内容写入磁盘
writer.writeSomeDocValuesUpdates();
// It's OK to add this here, even if the while loop retries, because delCount only includes newly
// deleted documents, on the segments we didn't already do in previous iterations:
totalDelCount += delCount;
...
if (privateSegment != null) {
// No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
// be applied before it kicks off, so this private segment must already not be in the set of merging segments
break;
}
// Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
// in pulling all our delGens into a merge:
synchronized (writer) {
long mergeGenCur = writer.mergeFinishedGen.get();
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
writer.finished(this);
finished = true;
// No merge finished while we were applying, so we are done!
break;
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
}
// A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not
// necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
iter++;
}
if (finished == false) {
// Record that this packet is finished:
writer.finished(this);
}
...
message += "; " + writer.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message);
}
}
/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
* the number of new deleted or updated documents. */
private synchronized long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (delGen == -1) {
// we were not yet pushed
throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first");
}
assert applied.getCount() != 0;
if (privateSegment != null) {
assert segStates.length == 1;
assert privateSegment == segStates[0].reader.getOriginalSegmentInfo();
}
//如果是FlushedSegment中的updates,则不会包含term删除,因为已经在DWPT刷盘时处理过了
totalDelCount += aplyTermDeletes(segStates);
//处理Query删除
totalDelCount += applyQueryDeletes(segStates);
//处理DocValues删除,不在本文介绍范围之内
totalDelCount += applyDocValuesUpdates(segStates);
return totalDelCount;
}
// Delete by query
private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (deleteQueries.length == 0) {
return 0;
}
long startNS = System.nanoTime();
long delCount = 0;
//双重for循环,对于每个segStage,运行每个query删除
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
final LeafReaderContext readerContext = segState.reader.getContext();
for (int i = 0; i < deleteQueries.length; i++) {
Query query = deleteQueries[i];
int limit;
if (delGen == segState.delGen) {
assert privateSegment != null;
limit = deleteQueryLimits[i];
} else {
limit = Integer.MAX_VALUE;
}
//进行查询
final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
searcher.setQueryCache(null);
query = searcher.rewrite(query);
final Weight weight = searcher.createWeight(query, false, 1);
final Scorer scorer = weight.scorer(readerContext);
if (scorer != null) {
final DocIdSetIterator it = scorer.iterator();
if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
assert privateSegment != null;
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int docID;
while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(docID) < limit) {
//删除满足Query条件的文档
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
} else {
int docID;
while ((docID = it.nextDoc()) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
}
...
return delCount;
}
//ReadersAndUpdates
//ReadersAndUpdates.pengdingDeletes记录删除的docID
public synchronized boolean delete(int docID) throws IOException {
if (reader == null && pendingDeletes.mustInitOnDelete()) {
getReader(IOContext.READ).decRef(); // pass a reader to initialize the pending deletes
}
return pendingDeletes.delete(docID);
}
注意上述FrozenBufferedUpdates.apply(IndexWriter writer)
中调用apply(BufferedUpdatesStream.SegmentState[] segStates)
是放在包含资源的try语句中,如下:
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success.set(true);
}
所以apply(BufferedUpdatesStream.SegmentState[] segStates)
执行完之后会调用finishApply
:
private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
boolean success, Set<String> delFiles) throws IOException {
synchronized (writer) {
BufferedUpdatesStream.ApplyDeletesResult result;
try {
//closeSegmentStates会返回完全被删除的segment,完全被删除表示该segment里所有文档都满足query删除条件,因此已经不包含任何文件了
result = closeSegmentStates(writer, segStates, success);
} finally {
// Matches the incRef we did above, but we must do the decRef after closing segment states else
// IFD can't delete still-open files
writer.deleter.decRef(delFiles);
}
if (result.anyDeletes) {
writer.maybeMerge.set(true);
writer.checkpoint();
}
if (result.allDeleted != null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "drop 100% deleted segments: " + writer.segString(result.allDeleted));
}
//删除完全被删除的segment
for (SegmentCommitInfo info : result.allDeleted) {
writer.dropDeletedSegment(info);
}
//checkpoint里会调用deleter.checkpoint(segmentInfos, false)不需要的文件(完全被删除的segment关联的文件会在此时被删除
writer.checkpoint();
}
}
}
/** Close segment states previously opened with openSegmentStates. */
public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates);
//找出完全被删除的segment
for (BufferedUpdatesStream.SegmentState segState : segmentStates) {
if (success) {
totDelCount += segState.rld.getDelCount() - segState.startDelCount;
int fullDelCount = segState.rld.getDelCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(segState.reader.getOriginalSegmentInfo());
}
}
}
IOUtils.close(segmentStates);
if (writer.infoStream.isEnabled("BD")) {
writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
}
return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
}
到这里,Query删除标记的删除文档已经记录在每个ReadersAndUpdates
里,rld由IndexWriter使用readerPool维护。
现在回到一开始的IndexWriter.doFlush(boolean applyAllDeletes)
函数,在调用processEvents(false)
之后,会调用writeReaderPool(applyAllDeletes)
函数:
//IndexWriter
private final void writeReaderPool(boolean writeDeletes) throws IOException {
assert Thread.holdsLock(this);
if (writeDeletes) {
//我们重点看这里,这里会将query删除生成的liv文件写入硬盘
if (readerPool.commit(segmentInfos)) {
checkpointNoSIS();
}
} else { // only write the docValues
if (readerPool.writeAllDocValuesUpdates()) {
checkpoint();
}
}
// now do some best effort to check if a segment is fully deleted
List<SegmentCommitInfo> toDrop = new ArrayList<>(); // don't modify segmentInfos in-place
for (SegmentCommitInfo info : segmentInfos) {
ReadersAndUpdates readersAndUpdates = readerPool.get(info, false);
if (readersAndUpdates != null) {
if (isFullyDeleted(readersAndUpdates)) {
toDrop.add(info);
}
}
}
for (SegmentCommitInfo info : toDrop) {
dropDeletedSegment(info);
}
if (toDrop.isEmpty() == false) {
checkpoint();
}
}
//ReaderPool
/**
* Commit live docs changes for the segment readers for
* the provided infos.
*
* @throws IOException If there is a low-level I/O error
*/
synchronized boolean commit(SegmentInfos infos) throws IOException {
boolean atLeastOneChange = false;
for (SegmentCommitInfo info : infos) {
final ReadersAndUpdates rld = readerMap.get(info);
if (rld != null) {
assert rld.info == info;
//调用rld.writeLiveDocs对liv文件进行刷盘
boolean changed = rld.writeLiveDocs(directory);
changed |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream);
if (changed) {
// Make sure we only write del docs for a live segment:
assert assertInfoIsLive(info);
// Must checkpoint because we just
// created new _X_N.del and field updates files;
// don't call IW.checkpoint because that also
// increments SIS.version, which we do not want to
// do here: it was done previously (after we
// invoked BDS.applyDeletes), whereas here all we
// did was move the state to disk:
atLeastOneChange = true;
}
}
}
return atLeastOneChange;
}
//ReadersAndUpdates
// Commit live docs (writes new _X_N.del files) and field updates (writes new
// _X_N updates files) to the directory; returns true if it wrote any file
// and false if there were no new deletes or updates to write:
public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
return pendingDeletes.writeLiveDocs(dir);
}
//PendingDeletes
/**
* Writes the live docs to disk and returns <code>true</code> if any new docs were written.
*/
boolean writeLiveDocs(Directory dir) throws IOException {
if (pendingDeleteCount == 0) {
return false;
}
Bits liveDocs = this.liveDocs;
assert liveDocs != null;
// We have new deletes
assert liveDocs.length() == info.info.maxDoc();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
// We can write directly to the actual name (vs to a
// .tmp & renaming it) because the file is not live
// until segments file is written:
boolean success = false;
try {
//这里终于看到最终编码刷盘的动作
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs(liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDelGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteDelGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
}
}
}
// If we hit an exc in the line above (eg disk full)
// then info's delGen remains pointing to the previous
// (successfully written) del docs:
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
dropChanges();
return true;
}
6 总结
上面流水账似的记录的Lucene删除的相关实现,许多细节没有涉及,肯定分析不到位以及错误的地方,烦请读者指出。
网友评论