美文网首页
Kafka删除日志源码分析

Kafka删除日志源码分析

作者: LancerLin_LX | 来源:发表于2018-02-02 11:47 被阅读0次

问题描述

1.问题定位

最近需要利用flume采集神策的历史数据,数据量比较大,每天大概有2000万条数据,大概要采集一个月的。然后发现数据还没来得及消费就被删除了,

2.问题处理

查看kafka配置后,发现两个关于删除日志策略的参数都设置了。
log.retention.hourslog.retention.bytes
1. 查看kafka0.9.0.0源码,LogManager类

   def cleanupLogs() {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      debug("Garbage collecting '" + log.name + "'")
      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
    }
    debug("Log cleanup completed. " + total + " files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }

2. 查看total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)两个方法,删除逻辑是或
首先查看cleanupExpiredSegments(log) 方法,

 private def cleanupExpiredSegments(log: Log): Int = {
    if (log.config.retentionMs < 0)
      return 0
    val startMs = time.milliseconds
    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  }```
 startMs 这个参数的源码查看在这里
```scala
private def getLogRetentionTimeMillis: Long = {
    val millisInMinute = 60L * 1000L
    val millisInHour = 60L * millisInMinute

    val millis: java.lang.Long =
      Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
        Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
          case Some(mins) => millisInMinute * mins
          case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
        })

    if (millis < 0) return -1
    millis   //总之就是 log.retention.hours log.retention.minutes log.retention.ms配置文件这三个来决定的,把他们转为毫秒
  }

所以cleanupExpiredSegments(log) 方法会根据日志超时时间来删除

查看cleanupSegmentsToMaintainSize(log)方法

private def cleanupSegmentsToMaintainSize(log: Log): Int = {
    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
      return 0
    var diff = log.size - log.config.retentionSize
    def shouldDelete(segment: LogSegment) = {
      if(diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }
    log.deleteOldSegments(shouldDelete)
  }

根据日志大小来决定,这个参数是log.retention.bytes
总结:kafka删除日志的逻辑是通过时间和日志大小来删除的,配置参数log.retention.hourslog.retention.bytes,如果只要按时间删除,则不设置log.retention.bytes,默认是-1,今天的问题就在于两个参数都设置了,所以出现日志过大就被删除了。

3.最后

Kafka日志是多久删除一次的呢?
LogManager类中的startup

def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }```
寻找` retentionCheckMs,`是由这个参数决定的,具体过程不详细秒速,`log.retention.check.interval.ms`,默认是300秒,就是5分钟执行清理逻辑一次。



相关文章

  • Kafka删除日志源码分析

    问题描述 1.问题定位 最近需要利用flume采集神策的历史数据,数据量比较大,每天大概有2000万条数据,大概要...

  • Kafka源码分析-Content Table

    Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3 Kafka源码分析...

  • KafkaProducer

    Kafka源码阅读(一):Kafka Producer整体架构概述及源码分析 zqhxuyuan Kafka源码分...

  • 日志段:保存消息文件的对象是怎么实现的?

    日志段及其相关代码是kafka服务器源码中最为重要的组件代码之一。kafka日志结构概览kafka日志在磁盘上的组...

  • Kafka处理请求的全流程解析

    大家好,我是 yes。 这是我的第三篇Kafka源码分析文章,前两篇讲了日志段的读写和二分算法在kafka索引上的...

  • Kafka的日志清理-LogCleaner

    这里说的日志,是指Kafka保存写入消息的文件; Kafka日志清除策略包括中间:基于时间和大小的删除策略;Com...

  • Kafka初识

    问题一 写出增加Kafka的Partition命令 问题二 列出配置Kafka删除日志的配置参数 问题三 Kafk...

  • 02、Kafka日志Log源码分析

    日志是日志段的容器,里面定义了很多管理日志段的操作。 既然日志要管理日志段对象,那么首先得加载所有日志段对象到内存...

  • KafkaProducer Sender 线程详解(含详细的执行

    温馨提示:本文基于 Kafka 2.2.1 版本。 上文 《源码分析 Kafka 消息发送流程》 已经详细介绍了 ...

  • kafka MetaData

    阅读以下两个即可 Kafka源码阅读(二):Producer Metadata概述及源码分析 KafkaProdu...

网友评论

      本文标题:Kafka删除日志源码分析

      本文链接:https://www.haomeiwen.com/subject/jlndhxtx.html