美文网首页
Lucene Term Query删除流程源码剖析-下

Lucene Term Query删除流程源码剖析-下

作者: persisting_ | 来源:发表于2018-12-23 23:33 被阅读0次

(因文章超出最大长度限制,这篇接Lucene Term Query删除流程源码剖析-上,属于对上篇准备数据之后的处理)

5 刷盘时对删除的处理

至此,已经介绍了删除的一些基础内容,下面就看下flush时,Lucene时如何处理删除操作的。

5.1 刷盘入口函数分析

因为IndexWriter.commit最终也会调用IndexWriter.flush,所以这里只介绍IndexWriter.flush

//IndexWriter
/** Moves all in-memory segments to the {@link Directory}, but does not commit
*  (fsync) them (call {@link #commit} for that). */
public final void flush() throws IOException {
  flush(true, true);
}

/**
   * Flush all in-memory buffered updates (adds and deletes)
   * to the Directory.
   * @param triggerMerge if true, we may merge segments (if
   *  deletes or docs were flushed) if necessary
   * @param applyAllDeletes whether pending deletes should also
   */
final void flush(boolean triggerMerge, boolean applyAllDeletes) throws IOException {

  // NOTE: this method cannot be sync'd because
  // maybeMerge() in turn calls mergeScheduler.merge which
  // in turn can take a long time to run and we don't want
  // to hold the lock for that.  In the case of
  // ConcurrentMergeScheduler this can lead to deadlock
  // when it stalls due to too many running merges.

  // We can be called during close, when closing==true, so we must pass false to ensureOpen:
  //保证该索引是打开状态
  ensureOpen(false);
  //最终调用doFlush(applyAllDeletes),之后会调用maybeMerge,也就是可能会触发一次段合并
  //不是本文重点,不会展开介绍
  if (doFlush(applyAllDeletes) && triggerMerge) {
    maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
  }
}

/** Returns true a segment was flushed or deletes were applied. */
private boolean doFlush(boolean applyAllDeletes) throws IOException {
  ...
  doBeforeFlush();
  testPoint("startDoFlush");
  boolean success = false;
  try {

    ...
    boolean anyChanges = false;
    
    synchronized (fullFlushLock) {
      boolean flushSuccess = false;
      try {
        //调用DocumentsWriter.flushAllThreads刷新所有的DWPT
        long seqNo = docWriter.flushAllThreads();
        if (seqNo < 0) {
          seqNo = -seqNo;
          anyChanges = true;
        } else {
          anyChanges = false;
        }
        if (!anyChanges) {
          // flushCount is incremented in flushAllThreads
          flushCount.incrementAndGet();
        }
        //刷新之后的所有DWPT都会返回一个FlushedSegments放入ticketQueue中
        //这里就执行ticketQueue的purge操作
        publishFlushedSegments(true);
        flushSuccess = true;
      } finally {
        assert holdsFullFlushLock();
        docWriter.finishFullFlush(flushSuccess);
        processEvents(false);
      }
    }

    if (applyAllDeletes) {
      applyAllDeletesAndUpdates();
    }

    anyChanges |= maybeMerge.getAndSet(false);
    
    synchronized(this) {
      writeReaderPool(applyAllDeletes);
      doAfterFlush();
      success = true;
      return anyChanges;
    }
  } catch (VirtualMachineError tragedy) {
    tragicEvent(tragedy, "doFlush");
    throw tragedy;
  } finally {
    if (!success) {
      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "hit exception during flush");
      }
      maybeCloseOnTragicEvent();
    }
  }
}

从上面可以看出,刷新的第一步是调用DocumentsWriter.flushAllThreads刷新所有的DWPT,前面说了Term删除属于局部、确定的删除,如果需要刷盘的doc能够匹配该term删除条件,就会被过滤掉。下面看下DocumentsWriter.flushAllThreads的具体逻辑

5.2 DWPT刷盘:Term删除处理

DWPT刷盘时会拿自己的pendingUpdates中deleteTerms记录的term删除匹配所有的文档,如果匹配上则不会写入硬盘,所以此阶段不需要生成liv文件,因为刷到硬盘里的都是存活的,其实这里说的也不全对,“刷到硬盘里的都是存活的”是对于pendingUpdates里的deleteTerms删除来说的,因为有可能这些存活的文档能匹配query删除条件,也需要被删除。对于query删除的处理是在所有DWPT刷盘完成之后进行的,那时会生成liv文件,标识满足query删除的文档是dead的。

这里还存在第4节说的意外情况,如果pendingUpdates.deleteDocIDs中有因更新文档异常记录的需要删除的docID,则也会生成liv文件。

这里先建立这样的一个全局刷新观念,下面看一下DocumentsWriter.flushAllThreads函数:

//DocumentsWriter
/*
  * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
  * two stage operation; the caller must ensure (in try/finally) that finishFlush
  * is called after this method, to release the flush lock in DWFlushControl
  */
long flushAllThreads()
  throws IOException {
  final DocumentsWriterDeleteQueue flushingDeleteQueue;
  ...

  long seqNo;

  synchronized (this) {
    pendingChangesInCurrentFullFlush = anyChanges();
    flushingDeleteQueue = deleteQueue;
    /* Cutover to a new delete queue.  This must be synced on the flush control
      * otherwise a new DWPT could sneak into the loop with an already flushing
      * delete queue */
    seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl
    assert setFlushingDeleteQueue(flushingDeleteQueue);
  }
  assert currentFullFlushDelQueue != null;
  assert currentFullFlushDelQueue != deleteQueue;
  
  boolean anythingFlushed = false;
  try {
    DocumentsWriterPerThread flushingDWPT;
    // Help out with flushing:
    //这里依次刷新所有的DWPT
    while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
      anythingFlushed |= doFlush(flushingDWPT);
    }
    // If a concurrent flush is still in flight wait for it
    //因为有可能刷新时并发的,这里等待所有DWPT刷新完成
    flushControl.waitForFlush();  
    if (anythingFlushed == false && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
      if (infoStream.isEnabled("DW")) {
        infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
      }
      assertTicketQueueModification(flushingDeleteQueue);
      //将globalQueue中的变更仿佛ticketQueue中,等待被purge
      ticketQueue.addDeletes(flushingDeleteQueue);
    }
    // we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
    // concurrently if we have very small ram buffers this happens quite frequently
    assert !flushingDeleteQueue.anyChanges();
  } finally {
    assert flushingDeleteQueue == currentFullFlushDelQueue;
  }
  if (anythingFlushed) {
    return -seqNo;
  } else {
    return seqNo;
  }
}

下面看DWPT刷新操作:

private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
  boolean hasEvents = false;
  while (flushingDWPT != null) {
    hasEvents = true;
    boolean success = false;
    DocumentsWriterFlushQueue.FlushTicket ticket = null;
    try {
      assert currentFullFlushDelQueue == null
          || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
          + currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
          + " " + flushControl.isFullFlush();
      /*
        * Since with DWPT the flush process is concurrent and several DWPT
        * could flush at the same time we must maintain the order of the
        * flushes before we can apply the flushed segment and the frozen global
        * deletes it is buffering. The reason for this is that the global
        * deletes mark a certain point in time where we took a DWPT out of
        * rotation and freeze the global deletes.
        * 
        * Example: A flush 'A' starts and freezes the global deletes, then
        * flush 'B' starts and freezes all deletes occurred since 'A' has
        * started. if 'B' finishes before 'A' we need to wait until 'A' is done
        * otherwise the deletes frozen by 'B' are not applied to 'A' and we
        * might miss to deletes documents in 'A'.
        */
      try {
        assert assertTicketQueueModification(flushingDWPT.deleteQueue);
        // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
        //每个DWPT刷新都会从tickerQueue获取一个ticker,用于保证顺序
        ticket = ticketQueue.addFlushTicket(flushingDWPT);
        final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
        boolean dwptSuccess = false;
        try {
          // flush concurrently without locking
          //调用DWPT的刷新flush操作
          final FlushedSegment newSegment = flushingDWPT.flush(flushNotifications);
          //将DWPT.flush返回的FlushedSegment放到刚拿到的ticket位置上
          ticketQueue.addSegment(ticket, newSegment);
          dwptSuccess = true;
        } finally {
          subtractFlushedNumDocs(flushingDocsInRam);
          if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
            //可以看下下面flushNotifications的定义,这里将需要删除的文件封装为Event放入eventQueue中
            Set<String> files = flushingDWPT.pendingFilesToDelete();
            flushNotifications.deleteUnusedFiles(files);
            hasEvents = true;
          }
          if (dwptSuccess == false) {
            flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
            hasEvents = true;
          }
        }
        // flush was successful once we reached this point - new seg. has been assigned to the ticket!
        success = true;
      } finally {
        if (!success && ticket != null) {
          // In the case of a failure make sure we are making progress and
          // apply all the deletes since the segment flush failed since the flush
          // ticket could hold global deletes see FlushTicket#canPublish()
          ticketQueue.markTicketFailed(ticket);
        }
      }
      /*
        * Now we are done and try to flush the ticket queue if the head of the
        * queue has already finished the flush.
        */
      if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) {
        // This means there is a backlog: the one
        // thread in innerPurge can't keep up with all
        // other threads flushing segments.  In this case
        // we forcefully stall the producers.
        flushNotifications.onTicketBacklog();
        break;
      }
    } finally {
      flushControl.doAfterFlush(flushingDWPT);
    }
    //当前DWPT处理完毕之后,会获取下一个DWPT进行刷新,这里感觉像工作密取模式
    //因为这里有一个while循环,如果本DWPT完成后处理下一个DWPT,在flushAllThreads
    //其实也是一个while循环从flushControl获取DWPT依次进行flush
    flushingDWPT = flushControl.nextPendingFlush();
  }

  if (hasEvents) {
    flushNotifications.afterSegmentsFlushed();
  }

  // If deletes alone are consuming > 1/2 our RAM
  // buffer, force them all to apply now. This is to
  // prevent too-frequent flushing of a long tail of
  // tiny segments:
  final double ramBufferSizeMB = config.getRAMBufferSizeMB();
  if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
      flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
    hasEvents = true;
    if (applyAllDeletes(deleteQueue) == false) {
      if (infoStream.isEnabled("DW")) {
        infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
                                                flushControl.getDeleteBytesUsed()/(1024.*1024.),
                                                ramBufferSizeMB));
      }
      flushNotifications.onDeletesApplied();
    }
  }

  return hasEvents;
}

上面代码中出现了flushNotifications,因为DWPT的flush操作有可能是并发的,熟悉异步操作的读者可以知道,异步操作一般都会传入一个callback,在异步操作完成、发生异常时等会调用callback,flushNotifications就相当于callback。

flushNotificationsDocumentsWriter的一个成员变量,其实是在IndexWriter中实例化的,通过DocumentsWriter构造函数传入的。

flushNotificationsIndexWriter定义如下:

//从上面可以看到,主要就是将一些刷新过程中的操作定义成Event放入IndexWriter维护的eventQueue中。
private final DocumentsWriter.FlushNotifications flushNotifications = new DocumentsWriter.FlushNotifications() {
  @Override
  public void deleteUnusedFiles(Collection<String> files) {
    eventQueue.add(w -> w.deleteNewFiles(files));
  }

  @Override
  public void flushFailed(SegmentInfo info) {
    eventQueue.add(w -> w.flushFailed(info));
  }

  @Override
  public void afterSegmentsFlushed() throws IOException {
    try {
      publishFlushedSegments(false);
    } finally {
      if (false) {
        maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
      }
    }
  }

  @Override
  public void onTragicEvent(Throwable event, String message) {
    IndexWriter.this.onTragicEvent(event, message);
  }

  @Override
  public void onDeletesApplied() {
    eventQueue.add(w -> {
        try {
          w.publishFlushedSegments(true);
        } finally {
          flushCount.incrementAndGet();
        }
      }
    );
  }

  @Override
  public void onTicketBacklog() {
    eventQueue.add(w -> w.publishFlushedSegments(true));
  }
};

DWPT刷盘之前,会调用ticket = ticketQueue.addFlushTicket(flushingDWPT)获取一个ticket, ticketQueue在第2节已经介绍过,可继续看下面的过程加深对ticketQueue的理解:

//DocumentsWriterFlushQueue
synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
  // Each flush is assigned a ticket in the order they acquire the ticketQueue
  // lock
  incTickets();
  boolean success = false;
  try {
    // prepare flush freezes the global deletes - do in synced block!
    //dwpt.prepareFlush会把当前globalSlice中截取的globalQueue放入
    //globalBufferedQueue中,并对其进行冻结,返回冻结之后的FrozenBufferedUpdates实例
    //FlushTicket.frozenUpdates保存的就是刚冻结的全局globalBufferedUpdates.
    //这里有个需要注意的点,如果flush过程中没有Term删除和Query删除
    //的发生,那么后续的DWPT刷新时就不会产生冻结的globalBufferedUpdates,
    //因为此时globalSlice首尾指针都已经指向了globalQueue的尾节点
    final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
    queue.add(ticket);
    success = true;
    return ticket;
  } finally {
    if (!success) {
      decTickets();
    }
  }
}

FrozenBufferedUpdates prepareFlush() throws IOException {
    assert numDocsInRAM > 0;
    //冻结globalBufferedUpdates过程中也会将当前DWPT的deleteSlice尾指针指向当前globalQueue尾节点
    //以便后续deleteSlice.appy
    final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice);
    /* deleteSlice can possibly be null if we have hit non-aborting exceptions during indexing and never succeeded 
    adding a document. */
    if (deleteSlice != null) {
      // apply all deletes before we flush and release the delete slice
      //将当前DWPT deleteSlice截取的globalQueue中操作放入其pendingUpdates中
      deleteSlice.apply(pendingUpdates, numDocsInRAM);
      assert deleteSlice.isEmpty();
      deleteSlice.reset();
    }
    return globalUpdates;
  }
//DocumentsWriterFlushQueue
  FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) throws IOException {
    globalBufferLock.lock();
    /*
     * Here we freeze the global buffer so we need to lock it, apply all
     * deletes in the queue and reset the global slice to let the GC prune the
     * queue.
     */
    final Node<?> currentTail = tail; // take the current tail make this local any
    // Changes after this call are applied later
    // and not relevant here
    if (callerSlice != null) {
      // Update the callers slices so we are on the same page
      //这里也会将当前刷新的DWPT的deleteSlice尾指针更新为globalQueue
      //最新尾节点,等待deleteSlice后续apply
      callerSlice.sliceTail = currentTail;
    }
    try {
      if (globalSlice.sliceTail != currentTail) {
        globalSlice.sliceTail = currentTail;
        //将globalSlice首尾节点截取的globalQueue更新放入globalBufferedUpdates中
        globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
      }

      if (globalBufferedUpdates.any()) {
        //冻结globalBufferedUpdates
        final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
        //清空globalBufferedUpdates
        globalBufferedUpdates.clear();
        return packet;
      } else {
        return null;
      }
    } finally {
      globalBufferLock.unlock();
    }
  }

//DocumentsWriterFlushQueue.FlushTicket
//FlushTicket是DocumentsWriterFlushQueue的内部类,封装了DWPT刷新之后的段视图FlushedSegment以及该DWPT pendingUpdates冻结版本frozenUpdates
static final class FlushTicket {
    private final FrozenBufferedUpdates frozenUpdates;
    private final boolean hasSegment;
    private FlushedSegment segment;
    private boolean failed = false;
    private boolean published = false;

    FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) {
      this.frozenUpdates = frozenUpdates;
      this.hasSegment = hasSegment;
    }

    boolean canPublish() {
      return hasSegment == false || segment != null || failed;
    }

    synchronized void markPublished() {
      assert published == false: "ticket was already published - can not publish twice";
      published = true;
    }

    private void setSegment(FlushedSegment segment) {
      assert !failed;
      this.segment = segment;
    }

    private void setFailed() {
      assert segment == null;
      failed = true;
    }

    /**
     * Returns the flushed segment or <code>null</code> if this flush ticket doesn't have a segment. This can be the
     * case if this ticket represents a flushed global frozen updates package.
     */
    FlushedSegment getFlushedSegment() {
      return segment;
    }

    /**
     * Returns a frozen global deletes package.
     */
    FrozenBufferedUpdates getFrozenUpdates() {
      return frozenUpdates;
    }
  }
}

下面看DWPT.flush函数实现:

//DocumentsWriterPerThread
/** Flush all pending docs to a new segment */
FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
  assert numDocsInRAM > 0;
  assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
  segmentInfo.setMaxDoc(numDocsInRAM);
  //首先将本地维护的一些信息封装为一个SegmentWriteState对象进行处理
  //本地维护的信息包括:sementInfo、pendingUpdates等
  final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
      pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
  final double startMBUsed = bytesUsed() / 1024. / 1024.;

  // Apply delete-by-docID now (delete-byDocID only
  // happens when an exception is hit processing that
  // doc, eg if analyzer has some problem w/ the text):
  //处理pendingUpdates维护的deleteDocIDs,本文一开始介绍了pendingUpdates中维护的三大类更新操作,
  //deleteDocIDs主要维护更新异常之后需要删除的doc,这里可以直接在flushState.liveDocs标识其出来
  //如果deleteDocIDs不为空,则此DWPT刷盘会生成liv文件
  if (pendingUpdates.deleteDocIDs.size() > 0) {
    flushState.liveDocs = new FixedBitSet(numDocsInRAM);
    flushState.liveDocs.set(0, numDocsInRAM);
    for(int delDocID : pendingUpdates.deleteDocIDs) {
      flushState.liveDocs.clear(delDocID);
    }
    flushState.delCountOnFlush = pendingUpdates.deleteDocIDs.size();
    pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.deleteDocIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
    //pendingUpdates.deleteDocIDs已经处理了,即转化为liveDocs中标志为了,所有此处将其清空
    pendingUpdates.deleteDocIDs.clear();
  }

  if (aborted) {
    if (infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", "flush: skip because aborting is set");
    }
    return null;
  }

  long t0 = System.nanoTime();

  if (infoStream.isEnabled("DWPT")) {
    infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
  }
  final Sorter.DocMap sortMap;
  try {
    DocIdSetIterator softDeletedDocs;
    if (indexWriterConfig.getSoftDeletesField() != null) {
      softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField());
    } else {
      softDeletedDocs = null;
    }
    //进行实际的刷盘操作
    sortMap = consumer.flush(flushState);
    if (softDeletedDocs == null) {
      flushState.softDelCountOnFlush = 0;
    } else {
      flushState.softDelCountOnFlush = PendingSoftDeletes.countSoftDeletes(softDeletedDocs, flushState.liveDocs);
      assert flushState.segmentInfo.maxDoc() >= flushState.softDelCountOnFlush + flushState.delCountOnFlush;
    }
    // We clear this here because we already resolved them (private to this segment) when writing postings:
    //因为在刷盘时已经处理了term删除,所以这里可以将其清除
    pendingUpdates.clearDeleteTerms();
    //将此DWPT维护的segmentInfo文件更新为此次刷新新创建的文件
    segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));

    final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, flushState.softDelCountOnFlush, -1L, -1L, -1L);
    ...

    final BufferedUpdates segmentDeletes;
    if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
      pendingUpdates.clear();
      segmentDeletes = null;
    } else {
      segmentDeletes = pendingUpdates;
    }

    ...

    assert segmentInfo != null;
    //这里类似于为此DWPT刷新之后状态建立一个视图,返回给DocumentsWriter
    FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
        segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush, sortMap);
    sealFlushedSegment(fs, sortMap, flushNotifications);
    if (infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
    }
    return fs;
  } catch (Throwable t) {
    onAbortingException(t);
    throw t;
  } finally {
    maybeAbort("flush", flushNotifications);
  }
}

上面的consumer.flush(flushState)负责进行实际的刷盘动作,默认的consumer为DefaultIndexingChain,下面列出其flush函数代码,其实这里已经超出了本文标题应该涵盖的内容,因为DWPT.flush已经处理了term删除,
consumer.flush(flushState)会进行各个索引要素的刷盘动作,如nvd,dvd,dim,dii,最后需要注意的是liv文件会通过位图标识出被删除的doc,但是其他文件中该文档的相关信息并没有被删除。

 @Override
public Sorter.DocMap flush(SegmentWriteState state) throws IOException {

  // NOTE: caller (DocumentsWriterPerThread) handles
  // aborting on any exception from this method
  Sorter.DocMap sortMap = maybeSortSegment(state);
  int maxDoc = state.segmentInfo.maxDoc();
  long t0 = System.nanoTime();
  writeNorms(state, sortMap);
  if (docState.infoStream.isEnabled("IW")) {
    docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write norms");
  }
  
  t0 = System.nanoTime();
  writeDocValues(state, sortMap);
  if (docState.infoStream.isEnabled("IW")) {
    docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write docValues");
  }

  t0 = System.nanoTime();
  writePoints(state, sortMap);
  if (docState.infoStream.isEnabled("IW")) {
    docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write points");
  }
  
  // it's possible all docs hit non-aborting exceptions...
  t0 = System.nanoTime();
  storedFieldsConsumer.finish(maxDoc);
  storedFieldsConsumer.flush(state, sortMap);
  if (docState.infoStream.isEnabled("IW")) {
    docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to finish stored fields");
  }

  t0 = System.nanoTime();
  Map<String,TermsHashPerField> fieldsToFlush = new HashMap<>();
  for (int i=0;i<fieldHash.length;i++) {
    PerField perField = fieldHash[i];
    while (perField != null) {
      if (perField.invertState != null) {
        fieldsToFlush.put(perField.fieldInfo.name, perField.termsHashPerField);
      }
      perField = perField.next;
    }
  }

  termsHash.flush(fieldsToFlush, state, sortMap);
  if (docState.infoStream.isEnabled("IW")) {
    docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write postings and finish vectors");
  }

  // Important to save after asking consumer to flush so
  // consumer can alter the FieldInfo* if necessary.  EG,
  // FreqProxTermsWriter does this with
  // FieldInfo.storePayload.
  t0 = System.nanoTime();
  docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
  if (docState.infoStream.isEnabled("IW")) {
    docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos");
  }

  return sortMap;
}

到这里,DWPT刷盘已经完成,并向DocumentsWriter返回一个FlushedSegment实例。从上面的doFlush(DocumentsWriterPerThread flushingDWPT)代码可以看出,此时使用DWPT刷盘前的ticket,设置其FlushedSegment,即ticketQueue.addSegment(ticket, newSegment)

5.3 所有DWPT刷盘之后处理Query删除

如果所有的DWPT都完成了刷盘操作,DocumentsWriter.doFlush(DocumentsWriterPerThread flushingDWPT)会调用flushNotifications.afterSegmentsFlushed(),从flushNotifications定义可以知道afterSegmentsFlushed操作如下:

//IndexWriter.new DocumentsWriter.FlushNotifications
@Override
public void afterSegmentsFlushed() throws IOException {
  try {
    publishFlushedSegments(false);
  } finally {
    if (false) {
      maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
    }
  }
}
//forced为false,表示尝试purge ticketQueue中的记录
void publishFlushedSegments(boolean forced) throws IOException {
    docWriter.purgeFlushTickets(forced, ticket -> {
      DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
      //这里的获取的冻结的updates其实就是第一个刷新的DWPT冻结的全局
      //globalBufferedUpdates,见上面的ticketQueue.addFlushTicket(flushingDWPT);函数注释
      FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
      ticket.markPublished();
      //如果该ticket没有FlushedSegment,则表示是DocumentsWriter.flushAllThreads在flush完所有DWPT后
      //使用ticketQueue.addDeletes(flushingDeleteQueue)放入的globalQueue
      if (newSegment == null) { // this is a flushed global deletes package - not a segments
        if (bufferedUpdates != null && bufferedUpdates.any()) { // TODO why can this be null?
          publishFrozenUpdates(bufferedUpdates);
          if (infoStream.isEnabled("IW")) {
            infoStream.message("IW", "flush: push buffered updates: " + bufferedUpdates);
          }
        }
      } else {
        //每个DWPT刷盘之后放入的FlushedSegment
        assert newSegment.segmentInfo != null;
        if (infoStream.isEnabled("IW")) {
          infoStream.message("IW", "publishFlushedSegment seg-private updates=" + newSegment.segmentUpdates);
        }
        if (newSegment.segmentUpdates != null && infoStream.isEnabled("DW")) {
          infoStream.message("IW", "flush: push buffered seg private updates: " + newSegment.segmentUpdates);
        }
        // now publish!
        //bufferedUpdates是冻结的全局globalBufferedUpdates
        //newSegment.segmentUpdates才是DWPT刷新时自己的pendingUpdates
        publishFlushedSegment(newSegment.segmentInfo, newSegment.fieldInfos, newSegment.segmentUpdates,
            bufferedUpdates, newSegment.sortMap);
      }
    });
  }

//DocumentsWriter
void purgeFlushTickets(boolean forced, IOUtils.IOConsumer<DocumentsWriterFlushQueue.FlushTicket> consumer)
      throws IOException {
    if (forced) {
      ticketQueue.forcePurge(consumer);
    } else {
      ticketQueue.tryPurge(consumer);
    }
  }

//DocumentsWriterFlushQueue
 void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
    assert !Thread.holdsLock(this);
    if (purgeLock.tryLock()) {
      try {
        innerPurge(consumer);
      } finally {
        purgeLock.unlock();
      }
    }
  }

 private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
    assert purgeLock.isHeldByCurrentThread();
    while (true) {
      final FlushTicket head;
      final boolean canPublish;
      synchronized (this) {
        head = queue.peek();
        canPublish = head != null && head.canPublish(); // do this synced 
      }
      if (canPublish) {
        try {
          /*
           * if we block on publish -> lock IW -> lock BufferedDeletes we don't block
           * concurrent segment flushes just because they want to append to the queue.
           * the downside is that we need to force a purge on fullFlush since there could
           * be a ticket still in the queue. 
           */
           //consumer为IndexWriter.publishFlushedSegments传入的参数
          consumer.accept(head);

        } finally {
          synchronized (this) {
            // finally remove the published ticket from the queue
            final FlushTicket poll = queue.poll();
            decTickets();
            // we hold the purgeLock so no other thread should have polled:
            assert poll == head;
          }
        }
      } else {
        break;
      }
    }
  }

这里首先看IndexWriter.publishFlushedSegment函数

//IndexWriter
/**
   * Atomically adds the segment private delete packet and publishes the flushed
   * segments SegmentInfo to the index writer.
   */
  private synchronized void publishFlushedSegment(SegmentCommitInfo newSegment, FieldInfos fieldInfos,
                                                  FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
                                                  Sorter.DocMap sortMap) throws IOException {
    boolean published = false;
    try {
      // Lock order IW -> BDS
      ensureOpen(false);

      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "publishFlushedSegment " + newSegment);
      }

      if (globalPacket != null && globalPacket.any()) {
        //这里处理DWPT刷新时冻结的globalBufferedUpdates
        publishFrozenUpdates(globalPacket);
      }

      // Publishing the segment must be sync'd on IW -> BDS to make the sure
      // that no merge prunes away the seg. private delete packet
      final long nextGen;
      if (packet != null && packet.any()) {
        //这里处理DWPT自身的pendingUpdates
        nextGen = publishFrozenUpdates(packet);
      } else {
        // Since we don't have a delete packet to apply we can get a new
        // generation right away
        nextGen = bufferedUpdatesStream.getNextGen();
        // No deletes/updates here, so marked finished immediately:
        bufferedUpdatesStream.finishedSegment(nextGen);
      }
      //这里其实有个问题,根据上面的代码发现,即处理了冻结的全局globalBufferedUpdates
      //又处理了DWPT吱声的pendingUpdates,而二者之间是有同步关系的,
      //会不会造成处理两次?
      //其实通过源码实际调试发现,因为全局删除和局部删除之间的同步
      //DWPT初始化之后,globalBufferedUpdates中新增的删除操作
      //确实也会在pendingUpdates中出现。
      //避免对相同的操作执行两次删除的关键就在于这里的deleteGen。
      //首先IndexWriter在其成员bufferedUpdatesStream中维护了一个全局的delGen,称为nextGen
      //每次调用publishFrozenUpdates都会调用bufferedUpdatesStream.push递增该nextGen
      //并返回之前的nextGen(类似于return nextGen++)
      //除此之外,每个BufferedUpdates也有个delGen,该delGen也在
      //bufferedUpdatesStream.push设置,设置为push中递增之前的nextGen
      //假设当前bufferedUpdatesStream.nextGen = 1(也是源码中的默认值)
      //publishFrozenUpdates(globalPacket);(即publish全局变更)之后,globalPakcet.delGen=1,bufferedUpdatesStream.nextGen = 2;
      //nextGen = publishFrozenUpdates(packet)(即publish DWPT变更)之后,packet.delGen = 2,bufferedUpdatesStream.nextGen = 3; 但是这里publishFrozenUpdates(packet)返回值等于递增之前的值,即2,nextGen=2;
      //下面语句会设置当前待刷新段的delGen为2
      //在实际(FrozenBufferedUpdates)pakcet.apply(IndexWriter writer)中调用openSegmentStates时,会调用openSegmentStates
      //里面有个判断info.getBufferedDeletesGen() <= delGen
      //info.getBufferedDeletesGen() 返回的就是下面语句设置newSegment的delGen,小于等于右边的delGen就是packet的delGen
      //所以根据上面举例,globalPakcet的delGen肯定要小于info.getBufferedDeletesGen,所以在既有globalUpdates,又有pendingUpdates时,只有pendingUpdates才能打开相应的segment并执行。
      //这样就避免了因为同步造成的两次执行。

      //如果该DWPT没有pendingUpdates,则上面else里执行nextGen = bufferedUpdatesStream.getNextGen();
      //这样也会令globalPakcet的delGen肯定要小于info.getBufferedDeletesGen,避免了globalUpdates的执行

      //根据上面的注释也发现了globalBufferedUpdates优先级要小于pendingUpdates
      newSegment.setBufferedDeletesGen(nextGen);
      segmentInfos.add(newSegment);
      published = true;
      checkpoint();
      if (packet != null && packet.any() && sortMap != null) {
        // TODO: not great we do this heavyish op while holding IW's monitor lock,
        // but it only applies if you are using sorted indices and updating doc values:
        ReadersAndUpdates rld = getPooledInstance(newSegment, true);
        rld.sortMap = sortMap;
        // DON't release this ReadersAndUpdates we need to stick with that sortMap
      }
      FieldInfo fieldInfo = fieldInfos.fieldInfo(config.softDeletesField); // will return null if no soft deletes are present
      // this is a corner case where documents delete them-self with soft deletes. This is used to
      // build delete tombstones etc. in this case we haven't seen any updates to the DV in this fresh flushed segment.
      // if we have seen updates the update code checks if the segment is fully deleted.
      boolean hasInitialSoftDeleted = (fieldInfo != null
          && fieldInfo.getDocValuesGen() == -1
          && fieldInfo.getDocValuesType() != DocValuesType.NONE);
      final boolean isFullyHardDeleted = newSegment.getDelCount() == newSegment.info.maxDoc();
      // we either have a fully hard-deleted segment or one or more docs are soft-deleted. In both cases we need
      // to go and check if they are fully deleted. This has the nice side-effect that we now have accurate numbers
      // for the soft delete right after we flushed to disk.
      if (hasInitialSoftDeleted || isFullyHardDeleted){
        // this operation is only really executed if needed an if soft-deletes are not configured it only be executed
        // if we deleted all docs in this newly flushed segment.
        ReadersAndUpdates rld = getPooledInstance(newSegment, true);
        try {
          if (isFullyDeleted(rld)) {
            dropDeletedSegment(newSegment);
            checkpoint();
          }
        } finally {
          release(rld);
        }
      }

    } finally {
      if (published == false) {
        adjustPendingNumDocs(-newSegment.info.maxDoc());
      }
      flushCount.incrementAndGet();
      doAfterFlush();
    }

}

synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
    assert packet != null && packet.any();
    long nextGen = bufferedUpdatesStream.push(packet);
    // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
    //向eventQueue中放入一个Event,该event调用了FrozenBufferedUpdates.apply(IndexWriter)方法。
    eventQueue.add(w -> {
      try {
        packet.apply(w);
      } catch (Throwable t) {
        try {
          w.onTragicEvent(t, "applyUpdatesPacket");
        } catch (Throwable t1) {
          t.addSuppressed(t1);
        }
        throw t;
      }
      w.flushDeletesCount.incrementAndGet();
    });
    return nextGen;
  }

下面看IndexWriter.eventQueue的相关处理,eventQueue里放入的Event会在IndexWriter.doFlush结束docWriter.flushAllThreads()之后调用processEvents(false)进行处理:

//IndexWriter
private void processEvents(boolean triggerMerge) throws IOException {
    if (tragedy.get() == null) {
      Event event;
      while ((event = eventQueue.poll()) != null)  {
        event.process(this);
      }
    }
    if (triggerMerge) {
      maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
    }
  }

我们直接看刚放入调用FrozenBufferedUpdates.apply(IndexWriter)的事件,即FrozenBufferedUpdates.apply(IndexWriter)函数:

//ForzenBufferedUpdates
 /** Translates a frozen packet of delete term/query, or doc values
  *  updates, into their actual docIDs in the index, and applies the change.  This is a heavy
  *  operation and is done concurrently by incoming indexing threads. */
@SuppressWarnings("try")
public synchronized void apply(IndexWriter writer) throws IOException {
  if (applied.getCount() == 0) {
    // already done
    return;
  }

  long startNS = System.nanoTime();

  assert any();

  Set<SegmentCommitInfo> seenSegments = new HashSet<>();

  int iter = 0;
  int totalSegmentCount = 0;
  long totalDelCount = 0;

  boolean finished = false;

  // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
  // concurrent merges are running.  Once we are done, we check to see if a merge completed while we were running.  If so, we must retry
  // resolving against the newly merged segment(s).  Eventually no merge finishes while we were running and we are done.
  while (true) {
    String messagePrefix;
    if (iter == 0) {
      messagePrefix = "";
    } else {
      messagePrefix = "iter " + iter;
    }

    long iterStartNS = System.nanoTime();

    long mergeGenStart = writer.mergeFinishedGen.get();

    Set<String> delFiles = new HashSet<>();
    BufferedUpdatesStream.SegmentState[] segStates;

    synchronized (writer) {
      //取得IndexWriter里维护的SegmentInfos
      List<SegmentCommitInfo> infos = getInfosToApply(writer);
      if (infos == null) {
        break;
      }

      for (SegmentCommitInfo info : infos) {
        delFiles.addAll(info.files());
      }

      // Must open while holding IW lock so that e.g. segments are not merged
      // away, dropped from 100% deletions, etc., before we can open the readers
      //每个SegmentInfo都生成一个SegmentState,这里涉及到ReadersAndUpdates
      //ReadersAndUpdates负责对内存中SegmentInfo应用删除、更新操作
      segStates = openSegmentStates(writer, infos, seenSegments, delGen());

      if (segStates.length == 0) {

        if (infoStream.isEnabled("BD")) {
          infoStream.message("BD", "packet matches no segments");
        }
        break;
      }

      ...

      totalSegmentCount += segStates.length;

      // Important, else IFD may try to delete our files while we are still using them,
      // if e.g. a merge finishes on some of the segments we are resolving on:
      writer.deleter.incRef(delFiles);
    }

    AtomicBoolean success = new AtomicBoolean();
    long delCount;
    try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
      // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
      //调用ForzenBufferedUpdates.apply(BufferedUpdatesStream.SegmentState[] segStates)函数
      delCount = apply(segStates);
      success.set(true);
    }

    // Since we just resolved some more deletes/updates, now is a good time to write them:
    //因为上面对内存中segment进行了更新,所以这里将更新的内容写入磁盘
    writer.writeSomeDocValuesUpdates();

    // It's OK to add this here, even if the while loop retries, because delCount only includes newly
    // deleted documents, on the segments we didn't already do in previous iterations:
    totalDelCount += delCount;

    ...
    if (privateSegment != null) {
      // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
      // be applied before it kicks off, so this private segment must already not be in the set of merging segments

      break;
    }

    // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
    // in pulling all our delGens into a merge:
    synchronized (writer) {
      long mergeGenCur = writer.mergeFinishedGen.get();

      if (mergeGenCur == mergeGenStart) {

        // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
        
        // Record that this packet is finished:
        writer.finished(this);

        finished = true;

        // No merge finished while we were applying, so we are done!
        break;
      }
    }

    if (infoStream.isEnabled("BD")) {
      infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
    }
      
    // A merge completed while we were running.  In this case, that merge may have picked up some of the updates we did, but not
    // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.

    iter++;
  }

  if (finished == false) {
    // Record that this packet is finished:
    writer.finished(this);
  }
      
  ...
    message += "; " + writer.getPendingUpdatesCount() + " packets remain";
    infoStream.message("BD", message);
  }
}

/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
  *  the number of new deleted or updated documents. */
private synchronized long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {

  if (delGen == -1) {
    // we were not yet pushed
    throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first");
  }

  assert applied.getCount() != 0;

  if (privateSegment != null) {
    assert segStates.length == 1;
    assert privateSegment == segStates[0].reader.getOriginalSegmentInfo();
  }

  //如果是FlushedSegment中的updates,则不会包含term删除,因为已经在DWPT刷盘时处理过了
  totalDelCount += aplyTermDeletes(segStates);
  //处理Query删除
  totalDelCount += applyQueryDeletes(segStates);
  //处理DocValues删除,不在本文介绍范围之内
  totalDelCount += applyDocValuesUpdates(segStates);

  return totalDelCount;
}

// Delete by query
private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {

  if (deleteQueries.length == 0) {
    return 0;
  }

  long startNS = System.nanoTime();

  long delCount = 0;
  //双重for循环,对于每个segStage,运行每个query删除
  for (BufferedUpdatesStream.SegmentState segState : segStates) {

    if (delGen < segState.delGen) {
      // segment is newer than this deletes packet
      continue;
    }
    
    if (segState.rld.refCount() == 1) {
      // This means we are the only remaining reference to this segment, meaning
      // it was merged away while we were running, so we can safely skip running
      // because we will run on the newly merged segment next:
      continue;
    }

    final LeafReaderContext readerContext = segState.reader.getContext();
    for (int i = 0; i < deleteQueries.length; i++) {
      Query query = deleteQueries[i];
      int limit;
      if (delGen == segState.delGen) {
        assert privateSegment != null;
        limit = deleteQueryLimits[i];
      } else {
        limit = Integer.MAX_VALUE;
      }
      //进行查询
      final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
      searcher.setQueryCache(null);
      query = searcher.rewrite(query);
      final Weight weight = searcher.createWeight(query, false, 1);
      final Scorer scorer = weight.scorer(readerContext);
      if (scorer != null) {
        final DocIdSetIterator it = scorer.iterator();
        if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
          assert privateSegment != null;
          // This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
          int docID;
          while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
            // The limit is in the pre-sorted doc space:
            if (segState.rld.sortMap.newToOld(docID) < limit) {
              //删除满足Query条件的文档
              if (segState.rld.delete(docID)) {
                delCount++;
              }
            }
          }
        } else {
          int docID;
          while ((docID = it.nextDoc()) < limit) {
            if (segState.rld.delete(docID)) {
              delCount++;
            }
          }
        }
      }
    }
  }

 ...
  return delCount;
}

//ReadersAndUpdates
//ReadersAndUpdates.pengdingDeletes记录删除的docID
public synchronized boolean delete(int docID) throws IOException {
  if (reader == null && pendingDeletes.mustInitOnDelete()) {
    getReader(IOContext.READ).decRef(); // pass a reader to initialize the pending deletes
  }
  return pendingDeletes.delete(docID);
}

注意上述FrozenBufferedUpdates.apply(IndexWriter writer)中调用apply(BufferedUpdatesStream.SegmentState[] segStates)是放在包含资源的try语句中,如下:

try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
  // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
  delCount = apply(segStates);
  success.set(true);
}

所以apply(BufferedUpdatesStream.SegmentState[] segStates)执行完之后会调用finishApply

private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
                           boolean success, Set<String> delFiles) throws IOException {
  synchronized (writer) {

    BufferedUpdatesStream.ApplyDeletesResult result;
    try {
      //closeSegmentStates会返回完全被删除的segment,完全被删除表示该segment里所有文档都满足query删除条件,因此已经不包含任何文件了
      result = closeSegmentStates(writer, segStates, success);
    } finally {
      // Matches the incRef we did above, but we must do the decRef after closing segment states else
      // IFD can't delete still-open files
      writer.deleter.decRef(delFiles);
    }

    if (result.anyDeletes) {
        writer.maybeMerge.set(true);
        writer.checkpoint();
    }

    if (result.allDeleted != null) {
      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "drop 100% deleted segments: " + writer.segString(result.allDeleted));
      }
      //删除完全被删除的segment
      for (SegmentCommitInfo info : result.allDeleted) {
        writer.dropDeletedSegment(info);
      }
      //checkpoint里会调用deleter.checkpoint(segmentInfos, false)不需要的文件(完全被删除的segment关联的文件会在此时被删除
      writer.checkpoint();
    }
  }
}

/** Close segment states previously opened with openSegmentStates. */
public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
  List<SegmentCommitInfo> allDeleted = null;
  long totDelCount = 0;
  final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates);
  //找出完全被删除的segment
  for (BufferedUpdatesStream.SegmentState segState : segmentStates) {
    if (success) {
      totDelCount += segState.rld.getDelCount() - segState.startDelCount;
      int fullDelCount = segState.rld.getDelCount();
      assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
      if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
        if (allDeleted == null) {
          allDeleted = new ArrayList<>();
        }
        allDeleted.add(segState.reader.getOriginalSegmentInfo());
      }
    }
  }
  IOUtils.close(segmentStates);
  if (writer.infoStream.isEnabled("BD")) {
    writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
  }

  return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
}

到这里,Query删除标记的删除文档已经记录在每个ReadersAndUpdates里,rld由IndexWriter使用readerPool维护。

现在回到一开始的IndexWriter.doFlush(boolean applyAllDeletes)函数,在调用processEvents(false)之后,会调用writeReaderPool(applyAllDeletes)函数:

//IndexWriter
private final void writeReaderPool(boolean writeDeletes) throws IOException {
  assert Thread.holdsLock(this);
  if (writeDeletes) {
    //我们重点看这里,这里会将query删除生成的liv文件写入硬盘
    if (readerPool.commit(segmentInfos)) {
      checkpointNoSIS();
    }
  } else { // only write the docValues
    if (readerPool.writeAllDocValuesUpdates()) {
      checkpoint();
    }
  }
  // now do some best effort to check if a segment is fully deleted
  List<SegmentCommitInfo> toDrop = new ArrayList<>(); // don't modify segmentInfos in-place
  for (SegmentCommitInfo info : segmentInfos) {
    ReadersAndUpdates readersAndUpdates = readerPool.get(info, false);
    if (readersAndUpdates != null) {
      if (isFullyDeleted(readersAndUpdates)) {
        toDrop.add(info);
      }
    }
  }
  for (SegmentCommitInfo info : toDrop) {
    dropDeletedSegment(info);
  }
  if (toDrop.isEmpty() == false) {
    checkpoint();
  }
}
//ReaderPool
/**
  * Commit live docs changes for the segment readers for
  * the provided infos.
  *
  * @throws IOException If there is a low-level I/O error
  */
synchronized boolean commit(SegmentInfos infos) throws IOException {
  boolean atLeastOneChange = false;
  for (SegmentCommitInfo info : infos) {
    final ReadersAndUpdates rld = readerMap.get(info);
    if (rld != null) {
      assert rld.info == info;
      //调用rld.writeLiveDocs对liv文件进行刷盘
      boolean changed = rld.writeLiveDocs(directory);
      changed |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream);

      if (changed) {
        // Make sure we only write del docs for a live segment:
        assert assertInfoIsLive(info);

        // Must checkpoint because we just
        // created new _X_N.del and field updates files;
        // don't call IW.checkpoint because that also
        // increments SIS.version, which we do not want to
        // do here: it was done previously (after we
        // invoked BDS.applyDeletes), whereas here all we
        // did was move the state to disk:
        atLeastOneChange = true;
      }
    }
  }
  return atLeastOneChange;
}

//ReadersAndUpdates
// Commit live docs (writes new _X_N.del files) and field updates (writes new
// _X_N updates files) to the directory; returns true if it wrote any file
// and false if there were no new deletes or updates to write:
public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
  return pendingDeletes.writeLiveDocs(dir);
}

//PendingDeletes
/**
  * Writes the live docs to disk and returns <code>true</code> if any new docs were written.
  */
boolean writeLiveDocs(Directory dir) throws IOException {
  if (pendingDeleteCount == 0) {
    return false;
  }

  Bits liveDocs = this.liveDocs;
  assert liveDocs != null;
  // We have new deletes
  assert liveDocs.length() == info.info.maxDoc();

  // Do this so we can delete any created files on
  // exception; this saves all codecs from having to do
  // it:
  TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);

  // We can write directly to the actual name (vs to a
  // .tmp & renaming it) because the file is not live
  // until segments file is written:
  boolean success = false;
  try {
    //这里终于看到最终编码刷盘的动作
    Codec codec = info.info.getCodec();
    codec.liveDocsFormat().writeLiveDocs(liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
    success = true;
  } finally {
    if (!success) {
      // Advance only the nextWriteDelGen so that a 2nd
      // attempt to write will write to a new file
      info.advanceNextWriteDelGen();

      // Delete any partially created file(s):
      for (String fileName : trackingDir.getCreatedFiles()) {
        IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
      }
    }
  }

  // If we hit an exc in the line above (eg disk full)
  // then info's delGen remains pointing to the previous
  // (successfully written) del docs:
  info.advanceDelGen();
  info.setDelCount(info.getDelCount() + pendingDeleteCount);
  dropChanges();
  return true;
}

6 总结

上面流水账似的记录的Lucene删除的相关实现,许多细节没有涉及,肯定分析不到位以及错误的地方,烦请读者指出。

相关文章

网友评论

      本文标题:Lucene Term Query删除流程源码剖析-下

      本文链接:https://www.haomeiwen.com/subject/ojgokqtx.html