美文网首页Flink
Flink运行时动态更改日志级别

Flink运行时动态更改日志级别

作者: 清蒸三文鱼_ | 来源:发表于2021-09-01 14:10 被阅读0次

    背景

    Flink On Yarn Cluster的模式, 任务提交时使用的FLINK_CONFG_DIR下的log4j.properties, 无法动态去更改日志的输出级别; 官方不支持通过传参指定日志配置文件, 详情查看YarnConfigOptionsInternal类上的说明

    如果想针对不同的任务, 指定日志输出级别, 那么只能在启动的时候通过export命令重新指定一下FLINK_CONFG_DIR, 这样终究不是太方便, 下面便讲讲如何让通过代码去更改.

    YarnConfigOptionsInternal内置参数 CDH下 log4j.properties CDH Flink环境脚本

    CODE

    LogDyDemo测试类

    import org.apache.logging.log4j.Level;
    import org.apache.logging.log4j.spi.LoggerContext;
    import org.apache.logging.slf4j.Log4jLoggerFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.Set;
    public class LogDyDemo {
        private static final Logger logger = LoggerFactory.getLogger("tmp_test");
        public static void main(String[] args) {
            //方式一,本地和flink运行生效
            Log4jLoggerFactory fac = (Log4jLoggerFactory)LoggerFactory.getILoggerFactory();
            Set<LoggerContext> loggerContexts = fac.getLoggerContexts();
            org.apache.logging.log4j.core.LoggerContext  next = (org.apache.logging.log4j.core.LoggerContext)
                    loggerContexts.iterator().next();
            next.getRootLogger().get().setLevel(Level.DEBUG);
            next.updateLoggers();
            
            //方式二, flink上实测可行,但本地不生效
            org.apache.logging.log4j.core.LoggerContext context = org.apache.logging.log4j.core.LoggerContext.getContext();
            context.getRootLogger().get().setLevel(Level.DEBUG);
            context.updateLoggers();
            
            logger.debug("log_print test=========>");
            logger.info("log_print test=========>");
            logger.warn("log_print test=========>");
            logger.error("log_print test=========>");
        }
    }
    

    log4j2.properties里的rootLogger的级别是INFO, 代码中改为了DEBUG, 两种方式二选一即可, 日志中能输出所有级别, 则测试通过

    测试通过

    Flink测试类

    在Task端通过静态代码块进行初始化, Flink默认使用log4j2, 所以下面只针对这种方式; 打包放到服务器上运行, 打包时建议将引用的maven依赖范围改为<scope>privode</scope> , 以减少运行时的class冲突

    public class FlinkDemo {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSet<String> text = env.fromElements(
                    "Who's there?",
                    "I think I hear them. Stand, ho! Who's there?");
            DataSet<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
            wordCounts.print();
        }
    
        public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            public static final Logger logger = LoggerFactory.getLogger(LineSplitter.class);
            static {
                Log4jLoggerFactory fac = (Log4jLoggerFactory) LoggerFactory.getILoggerFactory();
                Set<LoggerContext> loggerContexts = fac.getLoggerContexts();
                org.apache.logging.log4j.core.LoggerContext next = (org.apache.logging.log4j.core.LoggerContext)
                        loggerContexts.iterator().next();
                next.getRootLogger().get().setLevel(Level.DEBUG);
                next.updateLoggers();
            }
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                logger.debug("log_print test=========>");
                logger.info("log_print test=========>");
                logger.warn("log_print test=========>");
                logger.error("log_print test=========>");
    
                for (String word : line.split(" ")) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }
    }
    
    Yarn运行测试通过

    log4j2.properties

    下面的log4j配置是从flink的源码下dist子项目中复制过来的; Flink官方默认使用的是log4j2, 本地运行时需要改一下文件名称, appender.main.fileName也要改一下, 不然会报错

    monitorInterval=30
    rootLogger.level = INFO
    rootLogger.appenderRef.file.ref = MainAppender
    
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    
    # Log all infos in the given file
    appender.main.name = MainAppender
    appender.main.type = RollingFile
    appender.main.append = true
    appender.main.fileName = logs/demo_run.log
    appender.main.filePattern = logs/demo_run.log.%i
    appender.main.layout.type = PatternLayout
    appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.main.policies.type = Policies
    appender.main.policies.size.type = SizeBasedTriggeringPolicy
    appender.main.policies.size.size = 100MB
    appender.main.policies.startup.type = OnStartupTriggeringPolicy
    appender.main.strategy.type = DefaultRolloverStrategy
    appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
    

    pom.xml

        <dependencies>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.12.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>2.12.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>1.13.1</version>
            </dependency>
        </dependencies>
    

    相关文章

      网友评论

        本文标题:Flink运行时动态更改日志级别

        本文链接:https://www.haomeiwen.com/subject/qypxwltx.html