美文网首页
MessageSet.scala

MessageSet.scala

作者: 上海马超23 | 来源:发表于2018-11-17 22:59 被阅读0次
    abstract class MessageSet extends Iterable[MessageAndOffset] {
      // 消息写入channel
      def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
      // 迭代顺序读取MessageSet中的消息
      def iterator: Iterator[MessageAndOffset]
    }
    
    // file 指向日志文件
    // channel 是FileChannel类型,读写文件用
    // start,end 日志文件分片的起始位置和结束位置
    // isSlice 表示FileMessageSet 是否为日志文件的分片,还是完整的日志文件
    class FileMessageSet private[kafka](@volatile var file: File,
                                        private[log] val channel: FileChannel,
                                        private[log] val start: Int,
                                        private[log] val end: Int,
                                        isSlice: Boolean) extends MessageSet with Logging {
      // 日志文件或分片大小
      // 有可能多个Handler线程并发向一个分区写入消息,所以是原子
      private val _size =
        if(isSlice)
          new AtomicInteger(end - start)
        else
          new AtomicInteger(math.min(channel.size.toInt, end) - start)
    
      // 初始化过程中移动position指针到日志文件的尾部
      // 避免重启服务后的写入覆盖之前的操作
      if (!isSlice)
        channel.position(math.min(channel.size.toInt, end))
    
      def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
        if (mutable) { // mutable是true,表示文件可写
          if (fileAlreadyExists)
            new RandomAccessFile(file, "rw").getChannel()
          else {
            if (preallocate) {
              val randomAccessFile = new RandomAccessFile(file, "rw")
              // 预先分配文件大小
              // 可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
              // https://zhuanlan.zhihu.com/p/34915311 文件碎片
              randomAccessFile.setLength(initFileSize)
              randomAccessFile.getChannel()
            }
            else
              new RandomAccessFile(file, "rw").getChannel()
          }
        }
        else
          new FileInputStream(file).getChannel()
      }
    
      // 追加一批消息到日志文件
      def append(messages: ByteBufferMessageSet) {
        val written = messages.writeFullyTo(channel)
        _size.getAndAdd(written)
      }
    
      // 查询指定offset后续的第一个消息的offset
      def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
        var position = startingPosition
        // LogOverHead = offset + size 两个字段,不包含消息数据
        val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
        val size = sizeInBytes() // 当前FileMessageSet的大小
        // 从position开始逐条消息遍历
        while(position + MessageSet.LogOverhead < size) {
          buffer.rewind()
          channel.read(buffer, position)
          buffer.rewind()
          val offset = buffer.getLong()
          if(offset >= targetOffset)
            return OffsetPosition(offset, position)
          val messageSize = buffer.getInt()
          // 指针指向下一条消息
          position += MessageSet.LogOverhead + messageSize
        }
        null
      }
    }
    
    class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
    
      private def create(offsetAssigner: OffsetAssigner, compressionCodec: CompressionCodec,
                         wrapperMessageTimestamp: Option[Long], timestampType: TimestampType,
                         messages: Message*): ByteBuffer = {
        if (messages.isEmpty)
          MessageSet.Empty.buffer
        else if (compressionCodec == NoCompressionCodec) {
          val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
          // offsetAssigner.nextAbsoluteOffset() 为消息分配offset,并写入buffer
          for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
          buffer.rewind()
          buffer
        } else {
          var offset = -1L
          val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
          messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
            // 创建指定压缩类型的输出流
            val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
            try {
              // 遍历写入内层压缩消息
              for (message <- messages) {
                offset = offsetAssigner.nextAbsoluteOffset()
                // Magic为1,写入的是相对offset
                if (magicAndTimestamp.magic > Message.MagicValue_V0)
                  output.writeLong(offsetAssigner.toInnerOffset(offset))
                else
                  // 否则写的是绝对offset
                  output.writeLong(offset)
                output.writeInt(message.size)
                output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
              }
            } finally {
              output.close()
            }
          }
          val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
          // 按照消息格式写入整个外层消息,外层消息的offset是最后一条内层消息的offset
          writeMessage(buffer, messageWriter, offset)
          buffer.rewind()
          buffer
        }
      }
    
      def writeFullyTo(channel: GatheringByteChannel): Int = {
        buffer.mark()
        var written = 0
        while (written < sizeInBytes)
          written += channel.write(buffer)
        buffer.reset()
        written
      }
    }
    
    public static class RecordsIterator extends AbstractIterator<LogEntry> {
        // 消息内容
        private final ByteBuffer buffer;
        // 读取消息流
        private final DataInputStream stream;
        // 压缩类型
        private final CompressionType type;
        // 是否压缩消息的深层迭代
        private final boolean shallow;
        // 迭代压缩消息的迭代器
        private RecordsIterator innerIter;
    
        // LogEntry: offset + record
        private final ArrayDeque<LogEntry> logEntries;
        // 迭代压缩消息用,记录压缩消息里第一个消息的offset
        private final long absoluteBaseOffset;
    
        // public构造方法,创建Outer Iterator
        public RecordsIterator(ByteBuffer buffer, boolean shallow) {
            this.type = CompressionType.NONE;
            this.buffer = buffer;
            this.shallow = shallow;
            this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
            this.logEntries = null;
            this.absoluteBaseOffset = -1;
        }
    
        private RecordsIterator(LogEntry entry) {
            this.type = entry.record().compressionType();
            this.buffer = entry.record().value();
            this.shallow = true;
            this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
            long wrapperRecordOffset = entry.offset(); // 外层消息的offset
            if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
                this.logEntries = new ArrayDeque<>();
                long wrapperRecordTimestamp = entry.record().timestamp();
                // 在这个循环里,将内层消息解压出来添加到logEntries集合里
                while (true) {
                    try {
                        LogEntry logEntry = getNextEntryFromStream();
                        Record recordWithTimestamp = new Record(logEntry.record().buffer(),
                                                                wrapperRecordTimestamp,
                                                                entry.record().timestampType());
                        logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
                    } catch (EOFException e) {
                        break;
                    } catch (IOException e) {
                        throw new KafkaException(e);
                    }
                }
                this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
            } else {
                this.logEntries = null;
                this.absoluteBaseOffset = -1;
            }
    
        }
    
        @Override
        protected LogEntry makeNext() {
            if (innerDone()) {
                try {
                    LogEntry entry = getNextEntry();
                    if (entry == null)
                        return allDone();
    
                    if (absoluteBaseOffset >= 0) {
                        long absoluteOffset = absoluteBaseOffset + entry.offset();
                        entry = new LogEntry(absoluteOffset, entry.record());
                    }
    
                    CompressionType compression = entry.record().compressionType();
                    if (compression == CompressionType.NONE || shallow) {
                        return entry;
                    } else {
                        innerIter = new RecordsIterator(entry);
                        return innerIter.next();
                    }
                } catch (EOFException e) {
                    return allDone();
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            } else {
                return innerIter.next();
            }
        }
    }
    
    // MemoryRecords的迭代器RecordsIterator抽象类
    public abstract class AbstractIterator<T> implements Iterator<T> {
    
        private static enum State {
            READY,
            NOT_READY, // 迭代器未准备好迭代下一项,需要调用maybeComputeNext
            DONE, FAILED
        };
    
        private State state = State.NOT_READY;
        private T next;
    
        @Override
        public boolean hasNext() {
            switch (state) {
                case FAILED:
                    throw new IllegalStateException("Iterator is in failed state");
                case DONE:
                    return false;
                case READY:
                    return true;
                default:
                    return maybeComputeNext();
            }
        }
    
        @Override
        public T next() {
            if (!hasNext())
                throw new NoSuchElementException();
            state = State.NOT_READY;
            if (next == null)
                throw new IllegalStateException("Expected item but none found.");
            return next;
        }
    
        protected abstract T makeNext();
    
        private Boolean maybeComputeNext() {
            state = State.FAILED;
            next = makeNext();
            if (state == State.DONE) {
                return false;
            } else {
                state = State.READY;
                return true;
            }
        }
    
    }
    
    // _file 指向磁盘上的索引文件
    // baseOffset 对应日志文件中第一个消息的offset
    //
    class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
    
      // 对mmap操作时需要加锁保护
      private val lock = new ReentrantLock
    
      // 用来操作索引文件的MappedByteBuffer
      @volatile
      private[this] var mmap: MappedByteBuffer = {
        // 索引文件不存在,创建新文件返回true,否则返回false
        val newlyCreated = _file.createNewFile()
        val raf = new RandomAccessFile(_file, "rw")
        try {
          if (newlyCreated) {
            // 新创建的文件进行扩容,扩容结果是小于maxIndexSize的最大8的倍数
            // 比如 67,8 = 64
            raf.setLength(roundToExactMultiple(maxIndexSize, 8))
          }
    
          val len = raf.length()
          // 内存映射
          val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
    
          if (newlyCreated)
            idx.position(0)
          else
            // 已经存在的索引文件,position移动到末尾,防止覆盖原数据
            idx.position(roundToExactMultiple(idx.limit, 8))
          idx
        } finally {
          CoreUtils.swallow(raf.close())
        }
      }
    
      @volatile
      private[this] var _entries = mmap.position / 8 // 当前索引文件的索引项个数
      @volatile
      private[this] var _maxEntries = mmap.limit / 8 // 当前索引文件做多保存的索引项个数
      @volatile
      private[this] var _lastOffset = readLastEntry.offset // 保存最后一个索引项的offset
    
      // 查找目标小于targetOffset的最大offset对应的position
      def lookup(targetOffset: Long): OffsetPosition = {
          val idx = mmap.duplicate // 创建一个索引文件副本
          val slot = indexSlotFor(idx, targetOffset) // 二分查找的具体实现
          if(slot == -1)
            OffsetPosition(baseOffset, 0)
          else
            // 将offset和物理地址position封装成OffsetPosition对象返回
            // 物理地址position就是索引对应的消息在日志文件中的绝对位置
            // 只要打开文件并移动文件指针到这个position就可以读取对应的消息
            OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
      }
    }
    
    @nonthreadsafe
    class LogSegment(val log: FileMessageSet, // 对应日志文件的FileMessageSet对象
                     val index: OffsetIndex, // 对应索引文件的OffsetIndex对象
                     val baseOffset: Long, // LogSegment中第一条消息的offset
                     val indexIntervalBytes: Int, // 索引项之间间隔的最小字节数
                     val rollJitterMs: Long,
                     time: Time) extends Logging {
    
      // LogSegment对象创建时间,truncateTo清空日志文件时重置该字段
      var created = time.milliseconds
      // 上次添加索引后,在日志文件里累计加入的Message集合的字节数,用于判断下次添加索引项的时机
      private var bytesSinceLastIndexEntry = 0
    
      @nonthreadsafe
      def append(offset: Long, messages: ByteBufferMessageSet) {
        if (messages.sizeInBytes > 0) {
          // 检测是否满足添加索引项的字节数条件
          if(bytesSinceLastIndexEntry > indexIntervalBytes) {
            // 添加索引,重置计数器
            // 追加写入物理地址position就很简单了,直接就是文件当前大小
            index.append(offset, log.sizeInBytes())
            this.bytesSinceLastIndexEntry = 0
          }
          log.append(messages)
          this.bytesSinceLastIndexEntry += messages.sizeInBytes
        }
      }
    
      // 捞取指定offset范围的消息
      @threadsafe
      def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
        val logSize = log.sizeInBytes
        val startPosition = translateOffset(startOffset)
    
        val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
    
        // maxOffset的取值情况,计算读取的字节数
        val length = maxOffset match {
          case None =>
            min((maxPosition - startPosition.position).toInt, maxSize)
          case Some(offset) =>
            val mapping = translateOffset(offset, startPosition.position)
            val endPosition =
              if(mapping == null)
                logSize
              else
                mapping.position
            min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
        }
    
        FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
      }
    }
    
    // 对多个LogSegment对象的顺序组合,形成一个逻辑的日志
    class Log(val dir: File, // 存放每个LogSegment对应的日志文件和索引文件的目录
              @volatile var config: LogConfig, // Log相关的配置信息
              // 指定恢复操作的起始offset,recoveryPoint之前的Message已经持久化到磁盘上,
              // 其后的消息不一定,有丢失的风险
              @volatile var recoveryPoint: Long = 0L,
              scheduler: Scheduler, // 异步定时任务线程池,比如异步flush创建日志文件
              time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
      // 可能存在多个handler向同一个Log追加消息,需要保证同步
      private val lock = new Object
      // 使用SkipList管理LogSegment
      private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
    
      // 产生分配给消息的offset
      @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
    
      // 只有最后一个LogSegment才能写入
      def activeSegment = segments.lastEntry.getValue
    
      // 追加producer发送的消息
      def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
        val appendInfo = analyzeAndValidateMessageSet(messages)
        var validMessages = trimInvalidBytes(messages, appendInfo)
    
          lock synchronized {
            if (assignOffsets) {
              val offset = new LongRef(nextOffsetMetadata.messageOffset)
              appendInfo.firstOffset = offset.value
              val now = time.milliseconds
              val (validatedMessages, messageSizesMaybeChanged) =
                validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, appendInfo.targetCodec,
                                                               config.compact, config.messageFormatVersion.messageFormatVersion,
                                                               config.messageTimestampType, config.messageTimestampDifferenceMaxMs)
              validMessages = validatedMessages
              // 记录最后一条消息的offset
              appendInfo.lastOffset = offset.value - 1
              if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
                appendInfo.timestamp = now
            }
            // 获取activeSegment, 如果segment文件容量不够,需要roll一个新文件
            val segment = maybeRoll(validMessages.sizeInBytes)
            segment.append(appendInfo.firstOffset, validMessages)
            updateLogEndOffset(appendInfo.lastOffset + 1)
    
            if (unflushedMessages >= config.flushInterval)
              flush()
    
            appendInfo
          }
      }
    
      def flush(offset: Long) : Unit = {
        if (offset <= this.recoveryPoint)
          // 说明已经持久化了
          return
        for(segment <- logSegments(this.recoveryPoint, offset))
          segment.flush()
        lock synchronized {
          if(offset > this.recoveryPoint) {
            // 持久化后要更新recoveryPoint
            this.recoveryPoint = offset
            lastflushedTime.set(time.milliseconds)
          }
        }
      }
    
      def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
        val currentNextOffsetMetadata = nextOffsetMetadata
        val next = currentNextOffsetMetadata.messageOffset
        if(startOffset == next)
          // 文件末尾的offset,返回空
          return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
    
        var entry = segments.floorEntry(startOffset)
        while(entry != null) {
          val maxPosition = {
            if (entry == segments.lastEntry) {
              // 最后一个logEntry会有并发冲突可能
              val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
              if (entry != segments.lastEntry)
                entry.getValue.size
              else
                exposedPos
            } else {
              entry.getValue.size
            }
          }
          val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
          if(fetchInfo == null) {
            entry = segments.higherEntry(entry.getKey)
          } else {
            return fetchInfo
          }
        }
    
        FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
      }
    
      private def asyncDeleteSegment(segment: LogSegment) {
        // 将日志文件和索引文件后缀改成.deleted
        segment.changeFileSuffixes("", Log.DeletedFileSuffix)
        def deleteSeg() {
          segment.delete()
        }
        // 异步删除文件
        scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
      }
    }
    
    @threadsafe
    class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-",
                         daemon: Boolean = true) extends Scheduler with Logging {
      private var executor: ScheduledThreadPoolExecutor = null
      private val schedulerThreadId = new AtomicInteger(0)
    
      override def startup() {
        this synchronized {
          if(isStarted)
            throw new IllegalStateException("This scheduler has already been started!")
          executor = new ScheduledThreadPoolExecutor(threads)
          executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
          executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
          executor.setThreadFactory(new ThreadFactory() {
                  def newThread(runnable: Runnable): Thread =
                    Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
                })
        }
      }
    
      def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
        this synchronized {
          ensureRunning
          val runnable = CoreUtils.runnable {
              fun()
          }
          if(period >= 0)
            executor.scheduleAtFixedRate(runnable, delay, period, unit)
          else
            executor.schedule(runnable, delay, unit)
        }
      }
    }
    
    @threadsafe
    // 在server.properties指定多个Log目录,每个Log目录创建多个Log,LogManager在创建Log时会选择最少Log的目录
    class LogManager(val logDirs: Array[File],
                     val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig,
                     // 指定数量的加载线程
                     ioThreads: Int,
                     val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler,
                     val brokerState: BrokerState, private val time: Time) extends Logging {
    
      // 创建或删除Log时需要加锁
      private val logCreationOrDeletionLock = new Object
      // 底层是ConcurrentHashMap
      private val logs = new Pool[TopicAndPartition, Log]()
    
      // 每个Log目录的锁
      private val dirLocks = lockLogDirs(logDirs)
      // 每个Log目录和下面RecoveryPointCheckPoint文件的映射关系
      // RecoveryPointCheckPoint文件记录了该log目录下所有Log的recoveryPoint
      private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
      loadLogs()
    
      def startup() {
        if(scheduler != null) {
          // 清理日志定时任务
          scheduler.schedule("kafka-log-retention",
                             cleanupLogs,
                             delay = InitialTaskDelayMs,
                             period = retentionCheckMs,
                             TimeUnit.MILLISECONDS)
          // 刷新硬盘定时任务
          scheduler.schedule("kafka-log-flusher",
                             flushDirtyLogs,
                             delay = InitialTaskDelayMs,
                             period = flushCheckMs,
                             TimeUnit.MILLISECONDS)
          // 备份定时任务,写入RecoveryPointCheckpoint文件
          scheduler.schedule("kafka-recovery-point-checkpoint",
                             checkpointRecoveryPointOffsets,
                             delay = InitialTaskDelayMs,
                             period = flushCheckpointMs,
                             TimeUnit.MILLISECONDS)
        }
        if(cleanerConfig.enableCleaner)
          cleaner.startup()
      }
    
      def cleanupLogs() {
        var total = 0
        val startMs = time.milliseconds
        for(log <- allLogs; if !log.config.compact) {
          // 保留时长或日志文件大小决定是否删除
          total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
        }
      }
    }
    
    class LogCleaner(val config: CleanerConfig,
                     val logDirs: Array[File],
                     val logs: Pool[TopicAndPartition, Log],
                     time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
      private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
      // 线程
      private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
    }
    
    private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
      // 目录 -> offsetCheck文件
      private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
      // 正在进行的TopicPartition的压缩状态
      private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
      // 保护checkpoints和inProgress
      private val lock = new ReentrantLock
      // 等待线程状态从LogCleaningAborted到LogCleaningPaused
      private val pausedCleaningCond = lock.newCondition()
    
      // 更新指定目录的压缩checkPoint
      def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
        inLock(lock) {
          val checkpoint = checkpoints(dataDir)
          val existing = checkpoint.read().filterKeys(logs.keys) ++ update
          checkpoint.write(existing)
        }
      }
    
      def grabFilthiestLog(): Option[LogToClean] = {
        inLock(lock) {
          // 所有目录的cleanerCheckPoint
          val lastClean = allCleanerCheckpoints()
          val dirtyLogs = logs.filter {
            // 过滤掉cleanup.policy配置项为delete的log
            case (topicAndPartition, log) => log.config.compact
          }.filterNot {
            // 过滤掉inProgress状态的log
            case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
          }.map {
            case (topicAndPartition, log) =>
              val logStartOffset = log.logSegments.head.baseOffset
              val firstDirtyOffset = {
                // 压缩开始的位置,可能是checkPoint开始,也可能是第1条消息
                val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
                if (offset < logStartOffset) {
                  logStartOffset
                } else {
                  offset
                }
              }
              LogToClean(topicAndPartition, log, firstDirtyOffset)
          }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
    
          this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
          // and must meet the minimum threshold for dirty byte ratio
          val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
          if(cleanableLogs.isEmpty) {
            None
          } else {
            val filthiest = cleanableLogs.max
            inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
            Some(filthiest)
          }
        }
      }
    }
    

    相关文章

      网友评论

          本文标题:MessageSet.scala

          本文链接:https://www.haomeiwen.com/subject/xiqwfqtx.html