美文网首页
flink整合spring boot

flink整合spring boot

作者: 山间草夫 | 来源:发表于2021-08-07 14:48 被阅读0次

    Flink框架:Flink整合springboot

    首先说一下, 为什么flink 需要集成flink, spring boot给我们带来了更好的框架整合, 同时使用spring的DI和IOC,能更好的使用bean,当然直接使用spring 整合也是一样。

    实现原理

    实现原理, spring 的启动 一般使用 AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(AppConfig.class); 即可启动spring 容器, 对么spring boot 呢, 看过源码的人或许知道

    SpringApplication.run(arge); 只需要在启动flink之前启动sping boot 即可。

    代码

    flink 整合spring boot 以及redission, 并将事件的id放入redis 中, 代码库 https://gitee.com/imomoda/flink-sprint-boot

    • spring boot 启动工具类

      @SpringBootApplication(scanBasePackages = {"io.github.jeesk.flink"})
      @Import(SpringUtil.class)
      @Slf4j
      @EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
      public class SpringBootApplicationUtil {
      
      
          static SpringApplication springBootApplication = null;
          static SpringApplicationBuilder springApplicationBuilder = null;
      
          public static synchronized void run(String[] arge) {
              if (springBootApplication == null) {
                  StandardEnvironment standardEnvironment = new StandardEnvironment();
                  MutablePropertySources propertySources = standardEnvironment.getPropertySources();
                  propertySources.addFirst(new SimpleCommandLinePropertySource(arge));
                  String startJarPath = SpringBootApplicationUtil.class.getResource("/").getPath().split("!")[0];
                  String[] activeProfiles = standardEnvironment.getActiveProfiles();
                  propertySources.addLast(new MapPropertySource("systemProperties", standardEnvironment.getSystemProperties()));
                  propertySources.addLast(new SystemEnvironmentPropertySource("systemEnvironment", standardEnvironment.getSystemEnvironment()));
                  if (springBootApplication == null) {
                      springApplicationBuilder = new SpringApplicationBuilder(SpringBootApplicationUtil.class);
                      // 这里可以通过命令行传入
                      springApplicationBuilder.profiles("dev");
                      springApplicationBuilder.sources(SpringBootApplicationUtil.class).web(WebApplicationType.NONE);
                  }
                  springBootApplication = springApplicationBuilder.build();
                  springBootApplication.run(arge);
              }
          }
      
      
      }
      
    • flink job

      package io.github.jeesk.flink;
      
      import cn.hutool.extra.spring.SpringUtil;
      import io.github.jeesk.flink.config.SpringBootApplicationUtil;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
      import org.apache.flink.util.Collector;
      import org.apache.flink.walkthrough.common.entity.Alert;
      import org.apache.flink.walkthrough.common.entity.Transaction;
      import org.apache.flink.walkthrough.common.sink.AlertSink;
      import org.apache.flink.walkthrough.common.source.TransactionSource;
      import org.springframework.data.redis.core.StringRedisTemplate;
      
      public class FraudDetectionJob {
          public static void main(String[] args) throws Exception {
      
              Configuration configuration = new Configuration();
              if (args != null) {
                  configuration.setString("args", String.join(" ", args));
              }
              SpringBootApplicationUtil.run(args);
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              DataStream<Transaction> transactions = env
                      .addSource(new TransactionSource())
                      .name("transactions");
      
              DataStream<Alert> alerts = transactions
                      .keyBy(Transaction::getAccountId)
                      .process(new FraudDetector())
                      .name("fraud-detector");
      
              alerts
                      .addSink(new AlertSink())
                      .name("send-alerts");
      
              env.execute("Fraud Detection");
          }
      
          static public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
      
              private StringRedisTemplate redisTemplate = null;
      
              @Override
              public void open(Configuration parameters) throws Exception {
                  // 初始化bean
                  super.open(parameters);
                  SpringBootApplicationUtil.run(parameters.getString("arge", "").split(" "));
                  redisTemplate = SpringUtil.getBean(StringRedisTemplate.class);
      
              }
      
              @Override
              public void processElement(
                      Transaction transaction,
                      Context context,
                      Collector<Alert> collector) throws Exception {
      
                  Alert alert = new Alert();
                  alert.setId(transaction.getAccountId());
                  // 将id 放入redis 中
                  redisTemplate.opsForSet().add("tmpKey", String.valueOf(alert.getId()));
                  collector.collect(alert);
              }
          }
      }
      
      
    • flink 使用logback 还是log4j, 本demo 使用的是Logback , 需要做以下的处理

      1. 服务器端处理: flink 的安装目录下面放入logback 的包,log4j-over-slf4j-1.7.15.jar,logback-classic-1.2.3.jar,logback-core-1.2.3.jar ,
      2. 然后删除lib下面关于log4j的包 log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar), 如果不懂这些包的作用可以仔细阅读: JAVA 常见日志依赖处理细节
      3. 在代码的pom文件里面排除log4j的包
          <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.13.1</version>
                <!--排除log4j-->
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
                <!--<scope>provided</scope>-->
      </dependency>
      <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.13.1</version>
                <!--排除log4j-->
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
                <!--<scope>provided</scope>-->
      </dependency>
      
      
    1. 如果想修改flink 的logback的日志文件 , 可以在flink的conf目录下面修改下面的三个文件

        logback-console.xml
        logback-session.xml
        logback.xml
      

    参考内容

    相关文章

      网友评论

          本文标题:flink整合spring boot

          本文链接:https://www.haomeiwen.com/subject/twmtmltx.html