Spring Boot整合Flink

作者: Java程序员YY | 来源:发表于2019-08-15 15:49 被阅读0次

    使用spring boot整合flink可以快速的构建起整个应用,将关注点重点放在业务逻辑的实现上。在整合的过程中遇到许多问题,最大的问题是flink流无法访问spring容器中的类,从而导致空指针异常,解决思路是在流中进行spring bean的初始化以获得ApplicationContext,进而使用其getBean方法获取类实例。

    软件版本:Spring Boot 2.1.6+Flink1.6.1+JDK1.8

    程序主体:

    @SpringBootApplication

    public class HadesTmsApplication implements CommandLineRunner {

    public static void main(String[] args) {

    SpringApplication application = new SpringApplication(HadesTmsApplication.class);

    application.setBannerMode(Banner.Mode.OFF);

    application.run(args);

    }

    @Override

    public void run(String... args) {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010<>("topic-name"), new SimpleStringSchema(), getProperties());

    DataStream dataStream = env.addSource(kafkaConsumer);

    // 此处省略处理逻辑

    dataStream.addSink(new MySink());

    }

    private Properties getProperties() {

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", bootstrap_servers);

    properties.setProperty("zookeeper.connect", zookeeper_connect);

    properties.setProperty("group.id", group_id);

    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    return properties;

    }

    }

    说明一下:因为是非web项目,所以实现CommandLineRunner接口,重写run方法。在里面编写流处理逻辑。

    如果在MySink中需要使用spring容器中的类,而MySink是一个普通的类,那么是无法访问到的。会引发空指针异常。可能有人想到了ApplicationContextAware这个接口,实现这个接口获取ApplicationContext,也即是:

    @Component

    public class ApplicationContextUtil implements ApplicationContextAware, Serializable {

    private static final long serialVersionUID = -6454872090519042646L;

    private static ApplicationContext applicationContext = null;

    @Override

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

    if (ApplicationContextUtil.applicationContext == null) {

    ApplicationContextUtil.applicationContext = applicationContext;

    }

    }

    public static ApplicationContext getApplicationContext() {

    return applicationContext;

    }

    //通过name获取 Bean.

    public static Object getBean(String name) {

    return getApplicationContext().getBean(name);

    }

    //通过class获取Bean.

    public static T getBean(Class clazz) {

    return getApplicationContext().getBean(clazz);

    }

    //通过name,以及Clazz返回指定的Bean

    public static T getBean(String name, Class clazz) {

    return getApplicationContext().getBean(name, clazz);

    }

    }

    这种做法实际上在flink流处理中也是不可行的,在我之前的flink文章中 Flink读写系列之-读mysql并写入mysql 其中读和写阶段有一个open方法,这个方法专门用于进行初始化的,那么我们可以在这里进行spring bean的初始化。那么MySink改造后即为:

    @EnableAutoConfiguration

    @MapperScan(basePackages = {"com.xxx.bigdata.xxx.mapper"})

    public class SimpleSink extends RichSinkFunction {

    TeacherInfoMapper teacherInfoMapper;

    @Override

    public void open(Configuration parameters) throws Exception {

    super.open(parameters);

    SpringApplication application = new SpringApplication(SimpleSink.class);

    application.setBannerMode(Banner.Mode.OFF);

    ApplicationContext context = application.run(new String[]{});

    teacherInfoMapper = context.getBean(TeacherInfoMapper.class);

    }

    @Override

    public void close() throws Exception {

    super.close();

    }

    @Override

    public void invoke(String value, Context context) throws Exception {

    List teacherInfoList = teacherInfoMapper.selectByPage(0, 100);

    teacherInfoList.stream().forEach(teacherInfo -> System.out.println("teacherinfo:" + teacherInfo.getTeacherId() + "," + teacherInfo.getTimeBit() + "," + teacherInfo.getWeek()));

    }

    }

    在invoke中就可以访问spring容器中的Mapper方法了。

    pom如下:

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    4.0.0

    org.springframework.boot

    spring-boot-starter-parent

    2.1.6.RELEASE

    com.xxx.bigdata

    flink-project

    1.0.0

    flink-project

    jar

    My project for Spring Boot

    UTF-8

    UTF-8

    1.8

    1.6.1

    true

    1.8

    1.8

    org.springframework.boot

    spring-boot-starter

    ch.qos.logback

    logback-classic

    org.apache.flink

    flink-java

    ${flink.version}

    org.apache.flink

    flink-streaming-java_2.11

    ${flink.version}

    org.apache.flink

    flink-connector-kafka-0.10_2.11

    ${flink.version}

    com.cloudera

    ImpalaJDBC41

    2.6.4

    com.zaxxer

    HikariCP

    3.2.0

    org.mybatis.spring.boot

    mybatis-spring-boot-starter

    1.3.1

    com.alibaba

    fastjson

    1.2.47

    org.projectlombok

    lombok

    true

    org.springframework.boot

    spring-boot-starter-test

    test

    src/main/java

    src/main/resources

    true

    application.properties

    application-${package.environment}.properties

    org.springframework.boot

    spring-boot-maven-plugin

    true

    com.miaoke.bigdata.tms.HadesTmsApplication

    repackage

    org.mybatis.generator

    mybatis-generator-maven-plugin

    1.3.5

    ${basedir}/src/main/resources/generatorConfig.xml

    true

    true

    com.cloudera

    ImpalaJDBC41

    2.6.4

    dev

    dev

    true

    pre

    pre

    pro

    pro

    项目打包使用了默认的spring boot插件,配置了skip为true,如果不配置此项,打包后会多一个BOOT-INF目录,运行时会引起ClassNotFoundException等各种异常,比如KafkaStreming问题,甚至需要反转flink的类加载机制,由child-first变为parent-first(修改flink配置文件)等等。

    小编这里整理了一些相关的学习资料和学习视频,可免费领取。

    加微信:18410263200

    通过验证备注:111(备注必填)

    相关文章

      网友评论

        本文标题:Spring Boot整合Flink

        本文链接:https://www.haomeiwen.com/subject/oqbzjctx.html