背景
Flink On Yarn Cluster的模式, 任务提交时使用的FLINK_CONFG_DIR
下的log4j.properties
, 无法动态去更改日志的输出级别; 官方不支持通过传参指定日志配置文件, 详情查看YarnConfigOptionsInternal
类上的说明
如果想针对不同的任务, 指定日志输出级别, 那么只能在启动的时候通过export
命令重新指定一下FLINK_CONFG_DIR
, 这样终究不是太方便, 下面便讲讲如何让通过代码去更改.
CODE
LogDyDemo测试类
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.spi.LoggerContext;
import org.apache.logging.slf4j.Log4jLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
public class LogDyDemo {
private static final Logger logger = LoggerFactory.getLogger("tmp_test");
public static void main(String[] args) {
//方式一,本地和flink运行生效
Log4jLoggerFactory fac = (Log4jLoggerFactory)LoggerFactory.getILoggerFactory();
Set<LoggerContext> loggerContexts = fac.getLoggerContexts();
org.apache.logging.log4j.core.LoggerContext next = (org.apache.logging.log4j.core.LoggerContext)
loggerContexts.iterator().next();
next.getRootLogger().get().setLevel(Level.DEBUG);
next.updateLoggers();
//方式二, flink上实测可行,但本地不生效
org.apache.logging.log4j.core.LoggerContext context = org.apache.logging.log4j.core.LoggerContext.getContext();
context.getRootLogger().get().setLevel(Level.DEBUG);
context.updateLoggers();
logger.debug("log_print test=========>");
logger.info("log_print test=========>");
logger.warn("log_print test=========>");
logger.error("log_print test=========>");
}
}
log4j2.properties
里的rootLogger
的级别是INFO, 代码中改为了DEBUG, 两种方式二选一即可, 日志中能输出所有级别, 则测试通过
Flink测试类
在Task端通过静态代码块进行初始化, Flink默认使用log4j2, 所以下面只针对这种方式; 打包放到服务器上运行, 打包时建议将引用的maven依赖范围改为<scope>privode</scope>
, 以减少运行时的class冲突
public class FlinkDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
wordCounts.print();
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public static final Logger logger = LoggerFactory.getLogger(LineSplitter.class);
static {
Log4jLoggerFactory fac = (Log4jLoggerFactory) LoggerFactory.getILoggerFactory();
Set<LoggerContext> loggerContexts = fac.getLoggerContexts();
org.apache.logging.log4j.core.LoggerContext next = (org.apache.logging.log4j.core.LoggerContext)
loggerContexts.iterator().next();
next.getRootLogger().get().setLevel(Level.DEBUG);
next.updateLoggers();
}
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
logger.debug("log_print test=========>");
logger.info("log_print test=========>");
logger.warn("log_print test=========>");
logger.error("log_print test=========>");
for (String word : line.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
Yarn运行测试通过
log4j2.properties
下面的log4j配置是从flink的源码下dist子项目中复制过来的; Flink官方默认使用的是log4j2, 本地运行时需要改一下文件名称, appender.main.fileName
也要改一下, 不然会报错
monitorInterval=30
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = logs/demo_run.log
appender.main.filePattern = logs/demo_run.log.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
</dependencies>
网友评论