美文网首页
Kafka删除日志源码分析

Kafka删除日志源码分析

作者: LancerLin_LX | 来源:发表于2018-02-02 11:47 被阅读0次

    问题描述

    1.问题定位

    最近需要利用flume采集神策的历史数据,数据量比较大,每天大概有2000万条数据,大概要采集一个月的。然后发现数据还没来得及消费就被删除了,

    2.问题处理

    查看kafka配置后,发现两个关于删除日志策略的参数都设置了。
    log.retention.hourslog.retention.bytes
    1. 查看kafka0.9.0.0源码,LogManager类

       def cleanupLogs() {
        debug("Beginning log cleanup...")
        var total = 0
        val startMs = time.milliseconds
        for(log <- allLogs; if !log.config.compact) {
          debug("Garbage collecting '" + log.name + "'")
          total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
        }
        debug("Log cleanup completed. " + total + " files deleted in " +
                      (time.milliseconds - startMs) / 1000 + " seconds")
      }
    

    2. 查看total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)两个方法,删除逻辑是或
    首先查看cleanupExpiredSegments(log) 方法,

     private def cleanupExpiredSegments(log: Log): Int = {
        if (log.config.retentionMs < 0)
          return 0
        val startMs = time.milliseconds
        log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
      }```
     startMs 这个参数的源码查看在这里
    ```scala
    private def getLogRetentionTimeMillis: Long = {
        val millisInMinute = 60L * 1000L
        val millisInHour = 60L * millisInMinute
    
        val millis: java.lang.Long =
          Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
            Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
              case Some(mins) => millisInMinute * mins
              case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
            })
    
        if (millis < 0) return -1
        millis   //总之就是 log.retention.hours log.retention.minutes log.retention.ms配置文件这三个来决定的,把他们转为毫秒
      }
    

    所以cleanupExpiredSegments(log) 方法会根据日志超时时间来删除

    查看cleanupSegmentsToMaintainSize(log)方法

    private def cleanupSegmentsToMaintainSize(log: Log): Int = {
        if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
          return 0
        var diff = log.size - log.config.retentionSize
        def shouldDelete(segment: LogSegment) = {
          if(diff - segment.size >= 0) {
            diff -= segment.size
            true
          } else {
            false
          }
        }
        log.deleteOldSegments(shouldDelete)
      }
    

    根据日志大小来决定,这个参数是log.retention.bytes
    总结:kafka删除日志的逻辑是通过时间和日志大小来删除的,配置参数log.retention.hourslog.retention.bytes,如果只要按时间删除,则不设置log.retention.bytes,默认是-1,今天的问题就在于两个参数都设置了,所以出现日志过大就被删除了。

    3.最后

    Kafka日志是多久删除一次的呢?
    LogManager类中的startup

    def startup() {
        /* Schedule the cleanup task to delete old logs */
        if(scheduler != null) {
          info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
          scheduler.schedule("kafka-log-retention", 
                             cleanupLogs, 
                             delay = InitialTaskDelayMs, 
                             period = retentionCheckMs, 
                             TimeUnit.MILLISECONDS)
          info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
          scheduler.schedule("kafka-log-flusher", 
                             flushDirtyLogs, 
                             delay = InitialTaskDelayMs, 
                             period = flushCheckMs, 
                             TimeUnit.MILLISECONDS)
          scheduler.schedule("kafka-recovery-point-checkpoint",
                             checkpointRecoveryPointOffsets,
                             delay = InitialTaskDelayMs,
                             period = flushCheckpointMs,
                             TimeUnit.MILLISECONDS)
        }
        if(cleanerConfig.enableCleaner)
          cleaner.startup()
      }```
    寻找` retentionCheckMs,`是由这个参数决定的,具体过程不详细秒速,`log.retention.check.interval.ms`,默认是300秒,就是5分钟执行清理逻辑一次。
    
    
    
    

    相关文章

      网友评论

          本文标题:Kafka删除日志源码分析

          本文链接:https://www.haomeiwen.com/subject/jlndhxtx.html