美文网首页Flink
Flink任务日志写到kafka【最新1.12,1.13】

Flink任务日志写到kafka【最新1.12,1.13】

作者: FishMAN__ | 来源:发表于2021-06-15 19:30 被阅读0次

    这篇文章如果对你有帮助,记得点赞哦!有问题也可以给我评论~

    一、背景

    公司的日志希望能够同一到一个Kibana去做一个同一的展示,那就需要将任务的日志写到kafka。
    Flink1.12开始默认的日志框架就是log4j2,那么配置的方式跟之前log4j的方式有了一些区别,这边也踩了一些坑才解决。

    二、需要解决的痛点

        - 如何区分JobManager和TaskManager的日志
        - 如何将jobName信息添加到每条日志中,为后期的日志聚合提供方便

    三、详细配置介绍

    1、log4j.properties完整配置如下:

    # This affects logging for both user code and Flie
    rootLogger.appenderRef.rolling.ref = RollingFile
    rootLogger.appenderRef.kafka.ref = Kafka
    rootLogger.level = INFO
    
    
    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    logger.pupu.name = com.pupu
    logger.pupu.level = DEBUG
    
    # Log all infos in the given rolling file
    appender.rolling.type = RollingFile
    appender.rolling.name = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size = 200MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
    
    
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
    
    # kafka appender config
    appender.kafka.type=Kafka
    appender.kafka.name=Kafka
    appender.kafka.topic=flink_job_logs
    appender.kafka.property.type=Property
    appender.kafka.property.name=bootstrap.servers
    appender.kafka.property.value=xxxxxxxxxxxx:9092
    ## kafka的输出的日志pattern
    appender.kafka.layout.type=PatternLayout
    appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  ${sys:log.file}  ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
    ## 输出json格式的日志
    #appender.kafka.layout.type=JSONLayout
    #appender.kafka.layout.compact=true
    #appender.kafka.layout.complete=false
    #appender.kafka.layout.includeTimeMillis=true
    #appender.kafka.layout.additionalField1.type=KeyValuePair
    #appender.kafka.layout.additionalField1.key=logdir
    #appender.kafka.layout.additionalField1.value=${sys:log.file}
    #appender.kafka.layout.additionalField2.type=KeyValuePair
    #appender.kafka.layout.additionalField2.key=flink_per_job_name
    #appender.kafka.layout.additionalField2.value=${sys:flink_per_job_name}
    
    

    2、hdfs的/flink/lib包目录下添加相关jar包

    因为log4j2自带 kafka-log4j-appender,所以不需要添加这个包。
    如果你的jar包没有包含kafka-client包,最好添加跟你其他地方一个版本的,我这里添加是kafka-clients-2.6.0.jar

    3、启动任务时需要添加对应的flink_per_job_name到环境变量中

    "env.java.opts":"-Dflink_per_job_name=mytestyjb"
    
    

    这里有两种方式:
        - flink run的方式:指定-yD
        - 通过将配置项放到flink-conf.yaml

    四、问题及解决

    1、log4j2的properties配置方式

         - 官网配置
             [http://logging.apache.org/log4j/2.x/manual/configuration.html#Properties]
             [http://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender]
         - Stack Overflow上的解决
             [https://stackoverflow.com/questions/56252787/configure-log4j-properties-for-kafka-appender-error-when-parsing-property-boots]

    2、Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.

       flink lib缺少了kafka-clients-2.6.0.jar 包导致,详细参考跳转链接
       之前刚看到这个报错以为是依赖冲突,后来排查半天发现没有依赖冲突,然后各种百度,有的说需要在new producer前面加一个Thread.currentThread().setContextClassLoader(null) ,然而并没有解决,想了解跳转链接

    3、打到kafka的日志如何区分不同的jobname、JM和TM的日志?

    (1)区分不同的jobname的日志

      精髓 :就是想办法把环境变量传到flink运行时输出日志时,能够被log4j2识别到。
      步骤:
         - 想传入被识别到,目前经过实践,通过env.java.opts这个参数可以达到我们的目的。见官网跳转链接
    还有另外几个参数可以尝试一下:env.java.opts.jobmanager、env.java.opts.taskmanager;containerized.master.env.和containerized.taskmanager.env.这个试了不行。
         - log4j想要识别到这个环境变量,他的格式是:${sys: 环境变量名}

    (2)区分JM和TM的日志

         这边真的是踏遍铁鞋无匿处,得来全不费功夫!
         flink 的配置的RollingFile已经用这个参数${sys:log.file}指明了文件名,然后web UI上面能够取到对应的数据,那么把${sys:log.file}这个参数放到我的layout里面,那kafka那边就可以直接区分!真是完美!

    4、打到kafka的layout日志支持json格式

    参考这个跳转链接,上面也有具体的配置

    相关文章

      网友评论

        本文标题:Flink任务日志写到kafka【最新1.12,1.13】

        本文链接:https://www.haomeiwen.com/subject/mfftyltx.html