美文网首页flink入门
flink学习之三--引入spring

flink学习之三--引入spring

作者: AlanKim | 来源:发表于2019-03-13 10:12 被阅读318次

    Flink中引入Spring

    一个flink项目中可能存在多个job,不过一般每个job都是一个main方法了事,主要逻辑也在这个main中,如果需要用到别的功能,一般都是直接new,一直在业务中使用的spring并没用到。而在flink中可以用spring么?当然,并没有什么限制,只是需要手动初始化spring容器而已。

    在这里主要引入spring管理数据库连接池,而不是直接用jdbc connection处理数据库连接,再将数据库作为flink的数据源。

    1、引入依赖

    spring相关依赖:

            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>5.1.3.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
                <version>5.1.3.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jdbc</artifactId>
                <version>5.1.3.RELEASE</version>
            </dependency>
    

    mysql驱动

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.13</version>
            </dependency>
    

    使用druid数据库连接池

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.1.11</version>
            </dependency>
    

    mybatis相关依赖:

           <dependency>
                <groupId>org.mybatis</groupId>
                <artifactId>mybatis-spring</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.mybatis</groupId>
                <artifactId>mybatis</artifactId>
                <version>3.4.0</version>
            </dependency>
    

    2、配置maven插件

    如果这里不设置,在本地跑main方法是没啥问题的,不过打包之后上传到flink集群中,会出现找不到spring相关类的异常,配置如下:

    <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.flink:force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>myflink.job.KafkaDatasouceForFlinkJob</mainClass>
                                    </transformer>
    
                                        <transformer
                                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                            <resource>META-INF/spring.handlers</resource>
                                        </transformer>
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                            <resource>META-INF/spring.schemas</resource>
                                        </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    

    这里需要注意的有:

    1、maven-shade-plugin:创建项目的时候指定 Flink Quickstart Job类型,会自动在pom.xml添加这个plugin配置。

    2、mainClass:指定整个jar包的入口,不过其实关系不大,在上传到flink上之后,可以在submit job的时候再重新指定。

    3、AppendingTransformer,这两个配置是关键,会把spring相关的依赖、配置都打包到jar中。

    spring配置

    beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"
           default-lazy-init="false">
    
        <context:annotation-config/>
        <context:component-scan base-package="myflink"/>
        <context:component-scan base-package="myflink.ds"/>
        <context:component-scan base-package="myflink.repository"/>
    
        <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
            <property name="url" value="jdbc:mysql://localhost:3306/log" />
            <property name="username" value="root" />
            <property name="password" value="root" />
    
            <property name="filters" value="stat" />
    
            <property name="maxActive" value="20" />
            <property name="initialSize" value="1" />
            <property name="maxWait" value="60000" />
            <property name="minIdle" value="1" />
    
            <property name="timeBetweenEvictionRunsMillis" value="60000" />
            <property name="minEvictableIdleTimeMillis" value="300000" />
    
            <property name="testWhileIdle" value="true" />
            <property name="testOnBorrow" value="false" />
            <property name="testOnReturn" value="false" />
    
            <property name="poolPreparedStatements" value="true" />
            <property name="maxOpenPreparedStatements" value="20" />
    
            <property name="asyncInit" value="true" />
        </bean>
    
        <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
            <property name="basePackage" value="myflink.repository"/>
            <property name="sqlSessionFactoryBeanName" value="myBatisSqlSessionFactory"/>
            <property name="annotationClass" value="org.springframework.stereotype.Repository"/>
        </bean>
    
        <bean id="myBatisSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
            <property name="dataSource" ref="dataSource"/>
            <property name="mapperLocations" value="classpath*:mapper/*Mapper.xml"/>
        </bean>
    
        <bean id="mySqlTransactionManager"
              class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>
    
    </beans>
    

    如上所示,就是一个普通的spring配置,这里配置了annotation扫描的表、数据库连接池、mybatis相关配置、事务等。

    代码中处理

    1、初步尝试

    目前接触到的所有flink job都是从main方法中开始的,本来打算在main方法中初始化applicationContext,并把applicationContext作为参数传入到数据源中,或者直接将dataSource从spring中当做bean拿到,并放入addSource中,如下:

    public class MySqlDSPoolForFlinkJob {
    
        public static void main(String[] args) throws Exception {
    
            // 初始化spring容器
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
    
            // 获取bean
            MysqlDSWithSpringForFlink mysqlDSWithSpringForFlink = (MysqlDSWithSpringForFlink) applicationContext.getBean("mysqlDsWithSpring");
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.addSource(mysqlDSWithSpringForFlink).addSink(new PrintSinkFunction<>());
    
            env.execute("mysql Datasource with pool and spring");
        }
    }
    

    但是运行起来的时候,总是如下错:

    2019-01-14 21:43:54.729 [main] INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
    Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSourceFunction is not serializable. The object probably contains or references non serializable fields.
    

    即使做了spring容器的初始化,下层中依然无法获取到applicationContext。

    所以在main中,依然只能用new的方式获取对应的bean。

    2、在具体的业务逻辑中使用

    既然在main方法中无法直接传入bean,那就让main方法只是作为一层简单封装,具体的datasource、sink等操作都放在下层,也就是在具体的DataSource实现中进行spring初始化处理。

    修改后 main 方法的实现如下:

    public class MySqlDSPoolForFlinkJob {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 添加flink数据源
            env.addSource(new MysqlDSWithSpringForFlink()).addSink(new PrintSinkFunction<>());
    
            env.execute("mysql Datasource with pool and spring");
        }
    }
    

    关键是MysqlDSWithSpringForFlink中的实现。

    3、flink中的dataSource

    flink中的数据源需要实现RichSourceFunction<T>抽象类中的方法,这里的实现如下:

    package myflink.ds;
    
    import lombok.AllArgsConstructor;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import myflink.manager.UrlInfoManager;
    import myflink.model.UrlInfo;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import java.util.List;
    
    @Slf4j
    @AllArgsConstructor
    @NoArgsConstructor
    public class MysqlDSWithSpringForFlink extends RichSourceFunction<UrlInfo> implements ApplicationContextAware {
    
        private UrlInfoManager urlInfoManager;
    
        private ApplicationContext applicationContext;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            log.info("------open connection,applicationContext=" + applicationContext);
            super.open(parameters);
            if(applicationContext == null){
                init();
            }
    
        }
    
        private void init(){
            // 在这里进行spring的初始化
            applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
            urlInfoManager = (UrlInfoManager) applicationContext.getBean("urlInfoManager");
        }
    
        @Override
        public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
            log.info("------query ");
    
            if(urlInfoManager == null){
                init();
            }
    
            List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
            urlInfoList.parallelStream().forEach(urlInfo -> sourceContext.collect(urlInfo));
        }
    
        @Override
        public void cancel() {
            log.info("------cancel ");
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    

    说明:

    a、ApplicationContextAware 接口,表示可以取得applicationContext,避免在多线程的情况下多次初始化spring。

    b、这里在open方法中调用初始化spring容器的方法

    c、urlInfoManager就直接通过spring管理了,具体的实现放在下文

    4、关于RichSourceFunction

    RichSourceFunction的继承关系如下:

    public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT>{}
    

    注意AbstractRichFunction,这个类是RichSourceFunction、RichSinkFunction的父类,也就是Flink中自定义的DataSource、Sink都是来源于这个类。

    @Public
    public abstract class AbstractRichFunction implements RichFunction, Serializable {
        ......
    
        public void open(Configuration parameters) throws Exception {
        }
    
        public void close() throws Exception {
        }
    }
    

    这里主要封装了open、close方法,用于初始化数据源链接、关闭数据源链接。

    对于SourceFunction,看下其中实现

    package org.apache.flink.streaming.api.functions.source;
    
    import java.io.Serializable;
    import org.apache.flink.annotation.Public;
    import org.apache.flink.annotation.PublicEvolving;
    import org.apache.flink.api.common.functions.Function;
    import org.apache.flink.streaming.api.watermark.Watermark;
    
    @Public
    public interface SourceFunction<T> extends Function, Serializable {
        void run(SourceFunction.SourceContext<T> var1) throws Exception;
    
        void cancel();
    
        @Public
        public interface SourceContext<T> {
            void collect(T var1);
    
            @PublicEvolving
            void collectWithTimestamp(T var1, long var2);
    
            @PublicEvolving
            void emitWatermark(Watermark var1);
    
            @PublicEvolving
            void markAsTemporarilyIdle();
    
            Object getCheckpointLock();
    
            void close();
        }
    }
    

    这里主要用到的是run方法,里面是主要的数据源的操作实现。

    5、UrlInfo mybatis相关的实现

    UrlInfo pojo:

    package myflink.model;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.EqualsAndHashCode;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @EqualsAndHashCode(callSuper = false)
    public class UrlInfo {
        private int id;
    
        private String url;
    
        private String hash;
    }
    
    

    UrlInfoRepository:

    package myflink.repository;
    
    import myflink.model.UrlInfo;
    import org.springframework.stereotype.Repository;
    
    import java.util.List;
    
    @Repository
    public interface UrlInfoRepository {
    
        UrlInfo selectByPrimaryKey(Integer id);
    
        UrlInfo selectByUrl(String url);
    
        int insert(UrlInfo urlInfo);
    
        List<UrlInfo> queryAll();
    }
    
    

    UrlInfoMapper.xml:

    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="myflink.repository.UrlInfoRepository">
        <resultMap id="BaseResultMap" type="myflink.model.UrlInfo">
            <id column="id" property="id" jdbcType="INTEGER"/>
            <result column="url" property="url" jdbcType="VARCHAR"/>
            <result column="hash" property="hash" jdbcType="VARCHAR"/>
        </resultMap>
    
        <sql id="Base_Column_List">
            `id`,
            `url`,
            `hash`
        </sql>
    
        <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer">
            select
            <include refid="Base_Column_List"/>
            from url_info
            where id = #{id,jdbcType=INTEGER}
        </select>
    
        <select id="selectByUrl" resultMap="BaseResultMap" parameterType="java.lang.String">
            select
            <include refid="Base_Column_List"/>
            from url_info
            where url = #{url,jdbcType=VARCHAR}
        </select>
    
        <select id="queryAll" resultMap="BaseResultMap">
            select
            <include refid="Base_Column_List"/>
            from url_info
        </select>
    
        <insert id="insert" parameterType="myflink.model.UrlInfo">
            insert into url_info
            <trim prefix="(" suffix=")" suffixOverrides=",">
                <if test="url != null">
                    `url`,
                </if>
                <if test="hash != null">
                    `hash`
                </if>
            </trim>
            <trim prefix="values (" suffix=")" suffixOverrides=",">
                <if test="url != null">
                    #{url,jdbcType=VARCHAR},
                </if>
                <if test="hash != null">
                    #{hash,jdbcType=VARCHAR}
                </if>
            </trim>
        </insert>
    </mapper>
    

    UrlInfoManager:

    package myflink.manager;
    
    import myflink.model.UrlInfo;
    
    import java.util.List;
    
    public interface UrlInfoManager {
    
        int insert(UrlInfo urlInfo);
    
        List<UrlInfo> queryAll();
    }
    
    

    UrlInfoManagerImpl:

    package myflink.manager.impl;
    
    import myflink.manager.UrlInfoManager;
    import myflink.model.UrlInfo;
    import myflink.repository.UrlInfoRepository;
    import org.apache.commons.codec.digest.DigestUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.List;
    
    @Transactional
    @Component("urlInfoManager")
    public class UrlInfoManagerImpl implements UrlInfoManager {
    
        @Autowired
        private UrlInfoRepository urlInfoRepository;
    
        @Override
        public int insert(UrlInfo urlInfo) {
    
            urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));
    
            UrlInfo info = urlInfoRepository.selectByUrl(urlInfo.getUrl());
            if(null != info)
            {
                return 0;
            }
    
            return urlInfoRepository.insert(urlInfo);
        }
    
        @Override
        public List<UrlInfo> queryAll() {
            return urlInfoRepository.queryAll();
        }
    }
    
    

    相关文章

      网友评论

        本文标题:flink学习之三--引入spring

        本文链接:https://www.haomeiwen.com/subject/zeonuqtx.html