美文网首页
MessageSet.scala

MessageSet.scala

作者: 上海马超23 | 来源:发表于2018-11-17 22:59 被阅读0次
abstract class MessageSet extends Iterable[MessageAndOffset] {
  // 消息写入channel
  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
  // 迭代顺序读取MessageSet中的消息
  def iterator: Iterator[MessageAndOffset]
}

// file 指向日志文件
// channel 是FileChannel类型,读写文件用
// start,end 日志文件分片的起始位置和结束位置
// isSlice 表示FileMessageSet 是否为日志文件的分片,还是完整的日志文件
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {
  // 日志文件或分片大小
  // 有可能多个Handler线程并发向一个分区写入消息,所以是原子
  private val _size =
    if(isSlice)
      new AtomicInteger(end - start)
    else
      new AtomicInteger(math.min(channel.size.toInt, end) - start)

  // 初始化过程中移动position指针到日志文件的尾部
  // 避免重启服务后的写入覆盖之前的操作
  if (!isSlice)
    channel.position(math.min(channel.size.toInt, end))

  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) { // mutable是true,表示文件可写
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {
          val randomAccessFile = new RandomAccessFile(file, "rw")
          // 预先分配文件大小
          // 可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
          // https://zhuanlan.zhihu.com/p/34915311 文件碎片
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()
      }
    }
    else
      new FileInputStream(file).getChannel()
  }

  // 追加一批消息到日志文件
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)
    _size.getAndAdd(written)
  }

  // 查询指定offset后续的第一个消息的offset
  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
    var position = startingPosition
    // LogOverHead = offset + size 两个字段,不包含消息数据
    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
    val size = sizeInBytes() // 当前FileMessageSet的大小
    // 从position开始逐条消息遍历
    while(position + MessageSet.LogOverhead < size) {
      buffer.rewind()
      channel.read(buffer, position)
      buffer.rewind()
      val offset = buffer.getLong()
      if(offset >= targetOffset)
        return OffsetPosition(offset, position)
      val messageSize = buffer.getInt()
      // 指针指向下一条消息
      position += MessageSet.LogOverhead + messageSize
    }
    null
  }
}

class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {

  private def create(offsetAssigner: OffsetAssigner, compressionCodec: CompressionCodec,
                     wrapperMessageTimestamp: Option[Long], timestampType: TimestampType,
                     messages: Message*): ByteBuffer = {
    if (messages.isEmpty)
      MessageSet.Empty.buffer
    else if (compressionCodec == NoCompressionCodec) {
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      // offsetAssigner.nextAbsoluteOffset() 为消息分配offset,并写入buffer
      for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
      buffer.rewind()
      buffer
    } else {
      var offset = -1L
      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
        // 创建指定压缩类型的输出流
        val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
        try {
          // 遍历写入内层压缩消息
          for (message <- messages) {
            offset = offsetAssigner.nextAbsoluteOffset()
            // Magic为1,写入的是相对offset
            if (magicAndTimestamp.magic > Message.MagicValue_V0)
              output.writeLong(offsetAssigner.toInnerOffset(offset))
            else
              // 否则写的是绝对offset
              output.writeLong(offset)
            output.writeInt(message.size)
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      // 按照消息格式写入整个外层消息,外层消息的offset是最后一条内层消息的offset
      writeMessage(buffer, messageWriter, offset)
      buffer.rewind()
      buffer
    }
  }

  def writeFullyTo(channel: GatheringByteChannel): Int = {
    buffer.mark()
    var written = 0
    while (written < sizeInBytes)
      written += channel.write(buffer)
    buffer.reset()
    written
  }
}

public static class RecordsIterator extends AbstractIterator<LogEntry> {
    // 消息内容
    private final ByteBuffer buffer;
    // 读取消息流
    private final DataInputStream stream;
    // 压缩类型
    private final CompressionType type;
    // 是否压缩消息的深层迭代
    private final boolean shallow;
    // 迭代压缩消息的迭代器
    private RecordsIterator innerIter;

    // LogEntry: offset + record
    private final ArrayDeque<LogEntry> logEntries;
    // 迭代压缩消息用,记录压缩消息里第一个消息的offset
    private final long absoluteBaseOffset;

    // public构造方法,创建Outer Iterator
    public RecordsIterator(ByteBuffer buffer, boolean shallow) {
        this.type = CompressionType.NONE;
        this.buffer = buffer;
        this.shallow = shallow;
        this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
        this.logEntries = null;
        this.absoluteBaseOffset = -1;
    }

    private RecordsIterator(LogEntry entry) {
        this.type = entry.record().compressionType();
        this.buffer = entry.record().value();
        this.shallow = true;
        this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
        long wrapperRecordOffset = entry.offset(); // 外层消息的offset
        if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
            this.logEntries = new ArrayDeque<>();
            long wrapperRecordTimestamp = entry.record().timestamp();
            // 在这个循环里,将内层消息解压出来添加到logEntries集合里
            while (true) {
                try {
                    LogEntry logEntry = getNextEntryFromStream();
                    Record recordWithTimestamp = new Record(logEntry.record().buffer(),
                                                            wrapperRecordTimestamp,
                                                            entry.record().timestampType());
                    logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
                } catch (EOFException e) {
                    break;
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            }
            this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
        } else {
            this.logEntries = null;
            this.absoluteBaseOffset = -1;
        }

    }

    @Override
    protected LogEntry makeNext() {
        if (innerDone()) {
            try {
                LogEntry entry = getNextEntry();
                if (entry == null)
                    return allDone();

                if (absoluteBaseOffset >= 0) {
                    long absoluteOffset = absoluteBaseOffset + entry.offset();
                    entry = new LogEntry(absoluteOffset, entry.record());
                }

                CompressionType compression = entry.record().compressionType();
                if (compression == CompressionType.NONE || shallow) {
                    return entry;
                } else {
                    innerIter = new RecordsIterator(entry);
                    return innerIter.next();
                }
            } catch (EOFException e) {
                return allDone();
            } catch (IOException e) {
                throw new KafkaException(e);
            }
        } else {
            return innerIter.next();
        }
    }
}

// MemoryRecords的迭代器RecordsIterator抽象类
public abstract class AbstractIterator<T> implements Iterator<T> {

    private static enum State {
        READY,
        NOT_READY, // 迭代器未准备好迭代下一项,需要调用maybeComputeNext
        DONE, FAILED
    };

    private State state = State.NOT_READY;
    private T next;

    @Override
    public boolean hasNext() {
        switch (state) {
            case FAILED:
                throw new IllegalStateException("Iterator is in failed state");
            case DONE:
                return false;
            case READY:
                return true;
            default:
                return maybeComputeNext();
        }
    }

    @Override
    public T next() {
        if (!hasNext())
            throw new NoSuchElementException();
        state = State.NOT_READY;
        if (next == null)
            throw new IllegalStateException("Expected item but none found.");
        return next;
    }

    protected abstract T makeNext();

    private Boolean maybeComputeNext() {
        state = State.FAILED;
        next = makeNext();
        if (state == State.DONE) {
            return false;
        } else {
            state = State.READY;
            return true;
        }
    }

}

// _file 指向磁盘上的索引文件
// baseOffset 对应日志文件中第一个消息的offset
//
class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {

  // 对mmap操作时需要加锁保护
  private val lock = new ReentrantLock

  // 用来操作索引文件的MappedByteBuffer
  @volatile
  private[this] var mmap: MappedByteBuffer = {
    // 索引文件不存在,创建新文件返回true,否则返回false
    val newlyCreated = _file.createNewFile()
    val raf = new RandomAccessFile(_file, "rw")
    try {
      if (newlyCreated) {
        // 新创建的文件进行扩容,扩容结果是小于maxIndexSize的最大8的倍数
        // 比如 67,8 = 64
        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
      }

      val len = raf.length()
      // 内存映射
      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)

      if (newlyCreated)
        idx.position(0)
      else
        // 已经存在的索引文件,position移动到末尾,防止覆盖原数据
        idx.position(roundToExactMultiple(idx.limit, 8))
      idx
    } finally {
      CoreUtils.swallow(raf.close())
    }
  }

  @volatile
  private[this] var _entries = mmap.position / 8 // 当前索引文件的索引项个数
  @volatile
  private[this] var _maxEntries = mmap.limit / 8 // 当前索引文件做多保存的索引项个数
  @volatile
  private[this] var _lastOffset = readLastEntry.offset // 保存最后一个索引项的offset

  // 查找目标小于targetOffset的最大offset对应的position
  def lookup(targetOffset: Long): OffsetPosition = {
      val idx = mmap.duplicate // 创建一个索引文件副本
      val slot = indexSlotFor(idx, targetOffset) // 二分查找的具体实现
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else
        // 将offset和物理地址position封装成OffsetPosition对象返回
        // 物理地址position就是索引对应的消息在日志文件中的绝对位置
        // 只要打开文件并移动文件指针到这个position就可以读取对应的消息
        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
  }
}

@nonthreadsafe
class LogSegment(val log: FileMessageSet, // 对应日志文件的FileMessageSet对象
                 val index: OffsetIndex, // 对应索引文件的OffsetIndex对象
                 val baseOffset: Long, // LogSegment中第一条消息的offset
                 val indexIntervalBytes: Int, // 索引项之间间隔的最小字节数
                 val rollJitterMs: Long,
                 time: Time) extends Logging {

  // LogSegment对象创建时间,truncateTo清空日志文件时重置该字段
  var created = time.milliseconds
  // 上次添加索引后,在日志文件里累计加入的Message集合的字节数,用于判断下次添加索引项的时机
  private var bytesSinceLastIndexEntry = 0

  @nonthreadsafe
  def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) {
      // 检测是否满足添加索引项的字节数条件
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        // 添加索引,重置计数器
        // 追加写入物理地址position就很简单了,直接就是文件当前大小
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      log.append(messages)
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

  // 捞取指定offset范围的消息
  @threadsafe
  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
    val logSize = log.sizeInBytes
    val startPosition = translateOffset(startOffset)

    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

    // maxOffset的取值情况,计算读取的字节数
    val length = maxOffset match {
      case None =>
        min((maxPosition - startPosition.position).toInt, maxSize)
      case Some(offset) =>
        val mapping = translateOffset(offset, startPosition.position)
        val endPosition =
          if(mapping == null)
            logSize
          else
            mapping.position
        min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
    }

    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
  }
}

// 对多个LogSegment对象的顺序组合,形成一个逻辑的日志
class Log(val dir: File, // 存放每个LogSegment对应的日志文件和索引文件的目录
          @volatile var config: LogConfig, // Log相关的配置信息
          // 指定恢复操作的起始offset,recoveryPoint之前的Message已经持久化到磁盘上,
          // 其后的消息不一定,有丢失的风险
          @volatile var recoveryPoint: Long = 0L,
          scheduler: Scheduler, // 异步定时任务线程池,比如异步flush创建日志文件
          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  // 可能存在多个handler向同一个Log追加消息,需要保证同步
  private val lock = new Object
  // 使用SkipList管理LogSegment
  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

  // 产生分配给消息的offset
  @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)

  // 只有最后一个LogSegment才能写入
  def activeSegment = segments.lastEntry.getValue

  // 追加producer发送的消息
  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
    val appendInfo = analyzeAndValidateMessageSet(messages)
    var validMessages = trimInvalidBytes(messages, appendInfo)

      lock synchronized {
        if (assignOffsets) {
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = offset.value
          val now = time.milliseconds
          val (validatedMessages, messageSizesMaybeChanged) =
            validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, appendInfo.targetCodec,
                                                           config.compact, config.messageFormatVersion.messageFormatVersion,
                                                           config.messageTimestampType, config.messageTimestampDifferenceMaxMs)
          validMessages = validatedMessages
          // 记录最后一条消息的offset
          appendInfo.lastOffset = offset.value - 1
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.timestamp = now
        }
        // 获取activeSegment, 如果segment文件容量不够,需要roll一个新文件
        val segment = maybeRoll(validMessages.sizeInBytes)
        segment.append(appendInfo.firstOffset, validMessages)
        updateLogEndOffset(appendInfo.lastOffset + 1)

        if (unflushedMessages >= config.flushInterval)
          flush()

        appendInfo
      }
  }

  def flush(offset: Long) : Unit = {
    if (offset <= this.recoveryPoint)
      // 说明已经持久化了
      return
    for(segment <- logSegments(this.recoveryPoint, offset))
      segment.flush()
    lock synchronized {
      if(offset > this.recoveryPoint) {
        // 持久化后要更新recoveryPoint
        this.recoveryPoint = offset
        lastflushedTime.set(time.milliseconds)
      }
    }
  }

  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
    val currentNextOffsetMetadata = nextOffsetMetadata
    val next = currentNextOffsetMetadata.messageOffset
    if(startOffset == next)
      // 文件末尾的offset,返回空
      return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)

    var entry = segments.floorEntry(startOffset)
    while(entry != null) {
      val maxPosition = {
        if (entry == segments.lastEntry) {
          // 最后一个logEntry会有并发冲突可能
          val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
          if (entry != segments.lastEntry)
            entry.getValue.size
          else
            exposedPos
        } else {
          entry.getValue.size
        }
      }
      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
      if(fetchInfo == null) {
        entry = segments.higherEntry(entry.getKey)
      } else {
        return fetchInfo
      }
    }

    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
  }

  private def asyncDeleteSegment(segment: LogSegment) {
    // 将日志文件和索引文件后缀改成.deleted
    segment.changeFileSuffixes("", Log.DeletedFileSuffix)
    def deleteSeg() {
      segment.delete()
    }
    // 异步删除文件
    scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
  }
}

@threadsafe
class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-",
                     daemon: Boolean = true) extends Scheduler with Logging {
  private var executor: ScheduledThreadPoolExecutor = null
  private val schedulerThreadId = new AtomicInteger(0)

  override def startup() {
    this synchronized {
      if(isStarted)
        throw new IllegalStateException("This scheduler has already been started!")
      executor = new ScheduledThreadPoolExecutor(threads)
      executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
      executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
      executor.setThreadFactory(new ThreadFactory() {
              def newThread(runnable: Runnable): Thread =
                Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
            })
    }
  }

  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
    this synchronized {
      ensureRunning
      val runnable = CoreUtils.runnable {
          fun()
      }
      if(period >= 0)
        executor.scheduleAtFixedRate(runnable, delay, period, unit)
      else
        executor.schedule(runnable, delay, unit)
    }
  }
}

@threadsafe
// 在server.properties指定多个Log目录,每个Log目录创建多个Log,LogManager在创建Log时会选择最少Log的目录
class LogManager(val logDirs: Array[File],
                 val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig,
                 // 指定数量的加载线程
                 ioThreads: Int,
                 val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler,
                 val brokerState: BrokerState, private val time: Time) extends Logging {

  // 创建或删除Log时需要加锁
  private val logCreationOrDeletionLock = new Object
  // 底层是ConcurrentHashMap
  private val logs = new Pool[TopicAndPartition, Log]()

  // 每个Log目录的锁
  private val dirLocks = lockLogDirs(logDirs)
  // 每个Log目录和下面RecoveryPointCheckPoint文件的映射关系
  // RecoveryPointCheckPoint文件记录了该log目录下所有Log的recoveryPoint
  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  loadLogs()

  def startup() {
    if(scheduler != null) {
      // 清理日志定时任务
      scheduler.schedule("kafka-log-retention",
                         cleanupLogs,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
      // 刷新硬盘定时任务
      scheduler.schedule("kafka-log-flusher",
                         flushDirtyLogs,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         TimeUnit.MILLISECONDS)
      // 备份定时任务,写入RecoveryPointCheckpoint文件
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }

  def cleanupLogs() {
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      // 保留时长或日志文件大小决定是否删除
      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
    }
  }
}

class LogCleaner(val config: CleanerConfig,
                 val logDirs: Array[File],
                 val logs: Pool[TopicAndPartition, Log],
                 time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
  // 线程
  private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
}

private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
  // 目录 -> offsetCheck文件
  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
  // 正在进行的TopicPartition的压缩状态
  private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
  // 保护checkpoints和inProgress
  private val lock = new ReentrantLock
  // 等待线程状态从LogCleaningAborted到LogCleaningPaused
  private val pausedCleaningCond = lock.newCondition()

  // 更新指定目录的压缩checkPoint
  def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
    inLock(lock) {
      val checkpoint = checkpoints(dataDir)
      val existing = checkpoint.read().filterKeys(logs.keys) ++ update
      checkpoint.write(existing)
    }
  }

  def grabFilthiestLog(): Option[LogToClean] = {
    inLock(lock) {
      // 所有目录的cleanerCheckPoint
      val lastClean = allCleanerCheckpoints()
      val dirtyLogs = logs.filter {
        // 过滤掉cleanup.policy配置项为delete的log
        case (topicAndPartition, log) => log.config.compact
      }.filterNot {
        // 过滤掉inProgress状态的log
        case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
      }.map {
        case (topicAndPartition, log) =>
          val logStartOffset = log.logSegments.head.baseOffset
          val firstDirtyOffset = {
            // 压缩开始的位置,可能是checkPoint开始,也可能是第1条消息
            val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
            if (offset < logStartOffset) {
              logStartOffset
            } else {
              offset
            }
          }
          LogToClean(topicAndPartition, log, firstDirtyOffset)
      }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs

      this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
      // and must meet the minimum threshold for dirty byte ratio
      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
      if(cleanableLogs.isEmpty) {
        None
      } else {
        val filthiest = cleanableLogs.max
        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
        Some(filthiest)
      }
    }
  }
}

相关文章

网友评论

      本文标题:MessageSet.scala

      本文链接:https://www.haomeiwen.com/subject/xiqwfqtx.html