abstract class MessageSet extends Iterable[MessageAndOffset] {
// 消息写入channel
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
// 迭代顺序读取MessageSet中的消息
def iterator: Iterator[MessageAndOffset]
}
// file 指向日志文件
// channel 是FileChannel类型,读写文件用
// start,end 日志文件分片的起始位置和结束位置
// isSlice 表示FileMessageSet 是否为日志文件的分片,还是完整的日志文件
class FileMessageSet private[kafka](@volatile var file: File,
private[log] val channel: FileChannel,
private[log] val start: Int,
private[log] val end: Int,
isSlice: Boolean) extends MessageSet with Logging {
// 日志文件或分片大小
// 有可能多个Handler线程并发向一个分区写入消息,所以是原子
private val _size =
if(isSlice)
new AtomicInteger(end - start)
else
new AtomicInteger(math.min(channel.size.toInt, end) - start)
// 初始化过程中移动position指针到日志文件的尾部
// 避免重启服务后的写入覆盖之前的操作
if (!isSlice)
channel.position(math.min(channel.size.toInt, end))
def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
if (mutable) { // mutable是true,表示文件可写
if (fileAlreadyExists)
new RandomAccessFile(file, "rw").getChannel()
else {
if (preallocate) {
val randomAccessFile = new RandomAccessFile(file, "rw")
// 预先分配文件大小
// 可以让文件尽可能的占用连续的磁盘扇区,减少后续写入和读取文件时的磁盘寻道开销
// https://zhuanlan.zhihu.com/p/34915311 文件碎片
randomAccessFile.setLength(initFileSize)
randomAccessFile.getChannel()
}
else
new RandomAccessFile(file, "rw").getChannel()
}
}
else
new FileInputStream(file).getChannel()
}
// 追加一批消息到日志文件
def append(messages: ByteBufferMessageSet) {
val written = messages.writeFullyTo(channel)
_size.getAndAdd(written)
}
// 查询指定offset后续的第一个消息的offset
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
// LogOverHead = offset + size 两个字段,不包含消息数据
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes() // 当前FileMessageSet的大小
// 从position开始逐条消息遍历
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
buffer.rewind()
val offset = buffer.getLong()
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
// 指针指向下一条消息
position += MessageSet.LogOverhead + messageSize
}
null
}
}
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
private def create(offsetAssigner: OffsetAssigner, compressionCodec: CompressionCodec,
wrapperMessageTimestamp: Option[Long], timestampType: TimestampType,
messages: Message*): ByteBuffer = {
if (messages.isEmpty)
MessageSet.Empty.buffer
else if (compressionCodec == NoCompressionCodec) {
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
// offsetAssigner.nextAbsoluteOffset() 为消息分配offset,并写入buffer
for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
buffer.rewind()
buffer
} else {
var offset = -1L
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
// 创建指定压缩类型的输出流
val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
try {
// 遍历写入内层压缩消息
for (message <- messages) {
offset = offsetAssigner.nextAbsoluteOffset()
// Magic为1,写入的是相对offset
if (magicAndTimestamp.magic > Message.MagicValue_V0)
output.writeLong(offsetAssigner.toInnerOffset(offset))
else
// 否则写的是绝对offset
output.writeLong(offset)
output.writeInt(message.size)
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
}
} finally {
output.close()
}
}
val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
// 按照消息格式写入整个外层消息,外层消息的offset是最后一条内层消息的offset
writeMessage(buffer, messageWriter, offset)
buffer.rewind()
buffer
}
}
def writeFullyTo(channel: GatheringByteChannel): Int = {
buffer.mark()
var written = 0
while (written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
written
}
}
public static class RecordsIterator extends AbstractIterator<LogEntry> {
// 消息内容
private final ByteBuffer buffer;
// 读取消息流
private final DataInputStream stream;
// 压缩类型
private final CompressionType type;
// 是否压缩消息的深层迭代
private final boolean shallow;
// 迭代压缩消息的迭代器
private RecordsIterator innerIter;
// LogEntry: offset + record
private final ArrayDeque<LogEntry> logEntries;
// 迭代压缩消息用,记录压缩消息里第一个消息的offset
private final long absoluteBaseOffset;
// public构造方法,创建Outer Iterator
public RecordsIterator(ByteBuffer buffer, boolean shallow) {
this.type = CompressionType.NONE;
this.buffer = buffer;
this.shallow = shallow;
this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
private RecordsIterator(LogEntry entry) {
this.type = entry.record().compressionType();
this.buffer = entry.record().value();
this.shallow = true;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
long wrapperRecordOffset = entry.offset(); // 外层消息的offset
if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
this.logEntries = new ArrayDeque<>();
long wrapperRecordTimestamp = entry.record().timestamp();
// 在这个循环里,将内层消息解压出来添加到logEntries集合里
while (true) {
try {
LogEntry logEntry = getNextEntryFromStream();
Record recordWithTimestamp = new Record(logEntry.record().buffer(),
wrapperRecordTimestamp,
entry.record().timestampType());
logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
} catch (EOFException e) {
break;
} catch (IOException e) {
throw new KafkaException(e);
}
}
this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
} else {
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
}
@Override
protected LogEntry makeNext() {
if (innerDone()) {
try {
LogEntry entry = getNextEntry();
if (entry == null)
return allDone();
if (absoluteBaseOffset >= 0) {
long absoluteOffset = absoluteBaseOffset + entry.offset();
entry = new LogEntry(absoluteOffset, entry.record());
}
CompressionType compression = entry.record().compressionType();
if (compression == CompressionType.NONE || shallow) {
return entry;
} else {
innerIter = new RecordsIterator(entry);
return innerIter.next();
}
} catch (EOFException e) {
return allDone();
} catch (IOException e) {
throw new KafkaException(e);
}
} else {
return innerIter.next();
}
}
}
// MemoryRecords的迭代器RecordsIterator抽象类
public abstract class AbstractIterator<T> implements Iterator<T> {
private static enum State {
READY,
NOT_READY, // 迭代器未准备好迭代下一项,需要调用maybeComputeNext
DONE, FAILED
};
private State state = State.NOT_READY;
private T next;
@Override
public boolean hasNext() {
switch (state) {
case FAILED:
throw new IllegalStateException("Iterator is in failed state");
case DONE:
return false;
case READY:
return true;
default:
return maybeComputeNext();
}
}
@Override
public T next() {
if (!hasNext())
throw new NoSuchElementException();
state = State.NOT_READY;
if (next == null)
throw new IllegalStateException("Expected item but none found.");
return next;
}
protected abstract T makeNext();
private Boolean maybeComputeNext() {
state = State.FAILED;
next = makeNext();
if (state == State.DONE) {
return false;
} else {
state = State.READY;
return true;
}
}
}
// _file 指向磁盘上的索引文件
// baseOffset 对应日志文件中第一个消息的offset
//
class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
// 对mmap操作时需要加锁保护
private val lock = new ReentrantLock
// 用来操作索引文件的MappedByteBuffer
@volatile
private[this] var mmap: MappedByteBuffer = {
// 索引文件不存在,创建新文件返回true,否则返回false
val newlyCreated = _file.createNewFile()
val raf = new RandomAccessFile(_file, "rw")
try {
if (newlyCreated) {
// 新创建的文件进行扩容,扩容结果是小于maxIndexSize的最大8的倍数
// 比如 67,8 = 64
raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
val len = raf.length()
// 内存映射
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
if (newlyCreated)
idx.position(0)
else
// 已经存在的索引文件,position移动到末尾,防止覆盖原数据
idx.position(roundToExactMultiple(idx.limit, 8))
idx
} finally {
CoreUtils.swallow(raf.close())
}
}
@volatile
private[this] var _entries = mmap.position / 8 // 当前索引文件的索引项个数
@volatile
private[this] var _maxEntries = mmap.limit / 8 // 当前索引文件做多保存的索引项个数
@volatile
private[this] var _lastOffset = readLastEntry.offset // 保存最后一个索引项的offset
// 查找目标小于targetOffset的最大offset对应的position
def lookup(targetOffset: Long): OffsetPosition = {
val idx = mmap.duplicate // 创建一个索引文件副本
val slot = indexSlotFor(idx, targetOffset) // 二分查找的具体实现
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
// 将offset和物理地址position封装成OffsetPosition对象返回
// 物理地址position就是索引对应的消息在日志文件中的绝对位置
// 只要打开文件并移动文件指针到这个position就可以读取对应的消息
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
}
}
@nonthreadsafe
class LogSegment(val log: FileMessageSet, // 对应日志文件的FileMessageSet对象
val index: OffsetIndex, // 对应索引文件的OffsetIndex对象
val baseOffset: Long, // LogSegment中第一条消息的offset
val indexIntervalBytes: Int, // 索引项之间间隔的最小字节数
val rollJitterMs: Long,
time: Time) extends Logging {
// LogSegment对象创建时间,truncateTo清空日志文件时重置该字段
var created = time.milliseconds
// 上次添加索引后,在日志文件里累计加入的Message集合的字节数,用于判断下次添加索引项的时机
private var bytesSinceLastIndexEntry = 0
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
// 检测是否满足添加索引项的字节数条件
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
// 添加索引,重置计数器
// 追加写入物理地址position就很简单了,直接就是文件当前大小
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
// 捞取指定offset范围的消息
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
val logSize = log.sizeInBytes
val startPosition = translateOffset(startOffset)
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
// maxOffset的取值情况,计算读取的字节数
val length = maxOffset match {
case None =>
min((maxPosition - startPosition.position).toInt, maxSize)
case Some(offset) =>
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize
else
mapping.position
min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
}
// 对多个LogSegment对象的顺序组合,形成一个逻辑的日志
class Log(val dir: File, // 存放每个LogSegment对应的日志文件和索引文件的目录
@volatile var config: LogConfig, // Log相关的配置信息
// 指定恢复操作的起始offset,recoveryPoint之前的Message已经持久化到磁盘上,
// 其后的消息不一定,有丢失的风险
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler, // 异步定时任务线程池,比如异步flush创建日志文件
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
// 可能存在多个handler向同一个Log追加消息,需要保证同步
private val lock = new Object
// 使用SkipList管理LogSegment
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
// 产生分配给消息的offset
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
// 只有最后一个LogSegment才能写入
def activeSegment = segments.lastEntry.getValue
// 追加producer发送的消息
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
var validMessages = trimInvalidBytes(messages, appendInfo)
lock synchronized {
if (assignOffsets) {
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val (validatedMessages, messageSizesMaybeChanged) =
validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, appendInfo.targetCodec,
config.compact, config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType, config.messageTimestampDifferenceMaxMs)
validMessages = validatedMessages
// 记录最后一条消息的offset
appendInfo.lastOffset = offset.value - 1
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.timestamp = now
}
// 获取activeSegment, 如果segment文件容量不够,需要roll一个新文件
val segment = maybeRoll(validMessages.sizeInBytes)
segment.append(appendInfo.firstOffset, validMessages)
updateLogEndOffset(appendInfo.lastOffset + 1)
if (unflushedMessages >= config.flushInterval)
flush()
appendInfo
}
}
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
// 说明已经持久化了
return
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
lock synchronized {
if(offset > this.recoveryPoint) {
// 持久化后要更新recoveryPoint
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
}
}
}
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
val currentNextOffsetMetadata = nextOffsetMetadata
val next = currentNextOffsetMetadata.messageOffset
if(startOffset == next)
// 文件末尾的offset,返回空
return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
var entry = segments.floorEntry(startOffset)
while(entry != null) {
val maxPosition = {
if (entry == segments.lastEntry) {
// 最后一个logEntry会有并发冲突可能
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
if (entry != segments.lastEntry)
entry.getValue.size
else
exposedPos
} else {
entry.getValue.size
}
}
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}
FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
}
private def asyncDeleteSegment(segment: LogSegment) {
// 将日志文件和索引文件后缀改成.deleted
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
segment.delete()
}
// 异步删除文件
scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
}
}
@threadsafe
class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-",
daemon: Boolean = true) extends Scheduler with Logging {
private var executor: ScheduledThreadPoolExecutor = null
private val schedulerThreadId = new AtomicInteger(0)
override def startup() {
this synchronized {
if(isStarted)
throw new IllegalStateException("This scheduler has already been started!")
executor = new ScheduledThreadPoolExecutor(threads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
executor.setThreadFactory(new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
})
}
}
def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
this synchronized {
ensureRunning
val runnable = CoreUtils.runnable {
fun()
}
if(period >= 0)
executor.scheduleAtFixedRate(runnable, delay, period, unit)
else
executor.schedule(runnable, delay, unit)
}
}
}
@threadsafe
// 在server.properties指定多个Log目录,每个Log目录创建多个Log,LogManager在创建Log时会选择最少Log的目录
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig,
// 指定数量的加载线程
ioThreads: Int,
val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler,
val brokerState: BrokerState, private val time: Time) extends Logging {
// 创建或删除Log时需要加锁
private val logCreationOrDeletionLock = new Object
// 底层是ConcurrentHashMap
private val logs = new Pool[TopicAndPartition, Log]()
// 每个Log目录的锁
private val dirLocks = lockLogDirs(logDirs)
// 每个Log目录和下面RecoveryPointCheckPoint文件的映射关系
// RecoveryPointCheckPoint文件记录了该log目录下所有Log的recoveryPoint
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
def startup() {
if(scheduler != null) {
// 清理日志定时任务
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
// 刷新硬盘定时任务
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
// 备份定时任务,写入RecoveryPointCheckpoint文件
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
def cleanupLogs() {
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
// 保留时长或日志文件大小决定是否删除
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
}
}
class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
// 线程
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
}
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
// 目录 -> offsetCheck文件
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
// 正在进行的TopicPartition的压缩状态
private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
// 保护checkpoints和inProgress
private val lock = new ReentrantLock
// 等待线程状态从LogCleaningAborted到LogCleaningPaused
private val pausedCleaningCond = lock.newCondition()
// 更新指定目录的压缩checkPoint
def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read().filterKeys(logs.keys) ++ update
checkpoint.write(existing)
}
}
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
// 所有目录的cleanerCheckPoint
val lastClean = allCleanerCheckpoints()
val dirtyLogs = logs.filter {
// 过滤掉cleanup.policy配置项为delete的log
case (topicAndPartition, log) => log.config.compact
}.filterNot {
// 过滤掉inProgress状态的log
case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
}.map {
case (topicAndPartition, log) =>
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
// 压缩开始的位置,可能是checkPoint开始,也可能是第1条消息
val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
if (offset < logStartOffset) {
logStartOffset
} else {
offset
}
}
LogToClean(topicAndPartition, log, firstDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
}
网友评论