美文网首页
kafka时间戳👇

kafka时间戳👇

作者: 五洋捉鳖zz | 来源:发表于2020-08-25 15:23 被阅读0次
    • 在消息中增加一个时间戳字段。目前支持的时间戳类型有两种

      1. CreateTime(Producer创建消息的时间)
      2. LogAppendTime(leader broker将消息写入到log的时间)
    • 引入时间戳的原因

      1. 日志保存(log retention)策略:Kafka目前会定期删除过期日志(log.retention.hours,默认是7天)。判断的依据就是比较日志段文件(log segment file)的最新修改时间(last modification time)。倘若最近一次修改发生于7天前,那么就会视该日志段文件为过期日志,执行清除操作。但如果topic的某个分区曾经发生过分区副本的重分配(replica reassigment),那么就有可能会在一个新的broker上创建日志段文件,并把该文件的最新修改时间设置为最新时间,这样设定的清除策略就无法执行了,尽管该日志段中的数据其实已经满足可以被清除的条件了。
    1. 日志切分(log rolling)策略:与日志保存是一样的道理。当前日志段文件会根据规则对当前日志进行切分——即,创建一个新的日志段文件,并设置其为当前激活(active)日志段。其中有一条规则就是基于时间的(log.roll.hours,默认是7天),即当前日志段文件的最新一次修改发生于7天前的话,就创建一个新的日志段文件,并设置为active日志段。所以,它也有同样的问题,即最近修改时间不是固定的,一旦发生分区副本重分配,该值就会发生变更,导致日志无法执行切分。(注意:log.retention.hours及其家族与log.rolling.hours及其家族不会冲突的,因为Kafka不会清除当前激活日志段文件)
    2. 流式处理(Kafka streaming):流式处理中需要用到消息的时间戳
    • 为指定的topic设置时间戳类型
    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 --config message.timestamp.type=LogAppendTime
    
    • 使用console消费时打印时间戳字段
    <省略大家都知道的console消费部分> --property print.timestamp=true
    

    相关文章

      网友评论

          本文标题:kafka时间戳👇

          本文链接:https://www.haomeiwen.com/subject/zykzjktx.html