maven 配置
<properties>
<flink-version>1.12.2</flink-version>
<logback.version>1.2.3</logback.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
<exclusions>
<!-- 屏蔽自带的log4j -->
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
<exclusions>
<!-- 屏蔽自带的log4j -->
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<!-- 需要 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- 需要 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- 需要 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定jdk版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
logback.xml 文件配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>logback</contextName>
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 格式配置 -->
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!-- 控制台配置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 只让自己的程序以DEBUG级别输出日志 -->
<logger name="com.giant.RtCalc" level="DEBUG" />
<!-- 限制控制台只接受 error级别的输出,会屏蔽flink所带的或其他包的输出信息 -->
<root level="error">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
一旦这样写了之后,flink启动的所有日志信息都无法看到,只有当报错的时候才能看到,我们也可以自定义输出到的位置,以上是输出到console,输出还可以是到mysql、kafka、redis等,用于做flink程序的日志收集。以上的配置后 DataStream 的 print 就无法正常使用,需要自定义 sink 来替代 print 函数
自定义print
package com.giant.RtCalc.sink;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@Slf4j
@Data
public class PrintSink<IN> implements SinkFunction<IN> {
private String str = "print";
public PrintSink(String str) {
this.str = str;
}
public PrintSink() {
}
@Override
public void invoke(IN value, Context context) throws Exception {
log.debug(str+"->{}",value);
SinkFunction.super.invoke(value, context);
}
}
用 addSink 来代替 DataStream 的 print 功能。
这种方式虽然可以打印自己喜欢的日志格式,但是存在的问题是 提交到 flink-web 会报错,他只认 sl4j
LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/software/flink-1.12.2/lib/log4j-slf4j-impl-2.12.1.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.apache.logging.slf4j.Log4jLoggerFactory
网友评论