美文网首页flink入门
flink学习之三--引入spring

flink学习之三--引入spring

作者: AlanKim | 来源:发表于2019-03-13 10:12 被阅读318次

Flink中引入Spring

一个flink项目中可能存在多个job,不过一般每个job都是一个main方法了事,主要逻辑也在这个main中,如果需要用到别的功能,一般都是直接new,一直在业务中使用的spring并没用到。而在flink中可以用spring么?当然,并没有什么限制,只是需要手动初始化spring容器而已。

在这里主要引入spring管理数据库连接池,而不是直接用jdbc connection处理数据库连接,再将数据库作为flink的数据源。

1、引入依赖

spring相关依赖:

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>

mysql驱动

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.13</version>
        </dependency>

使用druid数据库连接池

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.11</version>
        </dependency>

mybatis相关依赖:

       <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.0</version>
        </dependency>

2、配置maven插件

如果这里不设置,在本地跑main方法是没啥问题的,不过打包之后上传到flink集群中,会出现找不到spring相关类的异常,配置如下:

<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>myflink.job.KafkaDatasouceForFlinkJob</mainClass>
                                </transformer>

                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.handlers</resource>
                                    </transformer>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.schemas</resource>
                                    </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

这里需要注意的有:

1、maven-shade-plugin:创建项目的时候指定 Flink Quickstart Job类型,会自动在pom.xml添加这个plugin配置。

2、mainClass:指定整个jar包的入口,不过其实关系不大,在上传到flink上之后,可以在submit job的时候再重新指定。

3、AppendingTransformer,这两个配置是关键,会把spring相关的依赖、配置都打包到jar中。

spring配置

beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"
       default-lazy-init="false">

    <context:annotation-config/>
    <context:component-scan base-package="myflink"/>
    <context:component-scan base-package="myflink.ds"/>
    <context:component-scan base-package="myflink.repository"/>

    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
        <property name="url" value="jdbc:mysql://localhost:3306/log" />
        <property name="username" value="root" />
        <property name="password" value="root" />

        <property name="filters" value="stat" />

        <property name="maxActive" value="20" />
        <property name="initialSize" value="1" />
        <property name="maxWait" value="60000" />
        <property name="minIdle" value="1" />

        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <property name="minEvictableIdleTimeMillis" value="300000" />

        <property name="testWhileIdle" value="true" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />

        <property name="poolPreparedStatements" value="true" />
        <property name="maxOpenPreparedStatements" value="20" />

        <property name="asyncInit" value="true" />
    </bean>

    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="myflink.repository"/>
        <property name="sqlSessionFactoryBeanName" value="myBatisSqlSessionFactory"/>
        <property name="annotationClass" value="org.springframework.stereotype.Repository"/>
    </bean>

    <bean id="myBatisSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="mapperLocations" value="classpath*:mapper/*Mapper.xml"/>
    </bean>

    <bean id="mySqlTransactionManager"
          class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

</beans>

如上所示,就是一个普通的spring配置,这里配置了annotation扫描的表、数据库连接池、mybatis相关配置、事务等。

代码中处理

1、初步尝试

目前接触到的所有flink job都是从main方法中开始的,本来打算在main方法中初始化applicationContext,并把applicationContext作为参数传入到数据源中,或者直接将dataSource从spring中当做bean拿到,并放入addSource中,如下:

public class MySqlDSPoolForFlinkJob {

    public static void main(String[] args) throws Exception {

        // 初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");

        // 获取bean
        MysqlDSWithSpringForFlink mysqlDSWithSpringForFlink = (MysqlDSWithSpringForFlink) applicationContext.getBean("mysqlDsWithSpring");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(mysqlDSWithSpringForFlink).addSink(new PrintSinkFunction<>());

        env.execute("mysql Datasource with pool and spring");
    }
}

但是运行起来的时候,总是如下错:

2019-01-14 21:43:54.729 [main] INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSourceFunction is not serializable. The object probably contains or references non serializable fields.

即使做了spring容器的初始化,下层中依然无法获取到applicationContext。

所以在main中,依然只能用new的方式获取对应的bean。

2、在具体的业务逻辑中使用

既然在main方法中无法直接传入bean,那就让main方法只是作为一层简单封装,具体的datasource、sink等操作都放在下层,也就是在具体的DataSource实现中进行spring初始化处理。

修改后 main 方法的实现如下:

public class MySqlDSPoolForFlinkJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加flink数据源
        env.addSource(new MysqlDSWithSpringForFlink()).addSink(new PrintSinkFunction<>());

        env.execute("mysql Datasource with pool and spring");
    }
}

关键是MysqlDSWithSpringForFlink中的实现。

3、flink中的dataSource

flink中的数据源需要实现RichSourceFunction<T>抽象类中的方法,这里的实现如下:

package myflink.ds;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.List;

@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class MysqlDSWithSpringForFlink extends RichSourceFunction<UrlInfo> implements ApplicationContextAware {

    private UrlInfoManager urlInfoManager;

    private ApplicationContext applicationContext;

    @Override
    public void open(Configuration parameters) throws Exception {
        log.info("------open connection,applicationContext=" + applicationContext);
        super.open(parameters);
        if(applicationContext == null){
            init();
        }

    }

    private void init(){
        // 在这里进行spring的初始化
        applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
        urlInfoManager = (UrlInfoManager) applicationContext.getBean("urlInfoManager");
    }

    @Override
    public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
        log.info("------query ");

        if(urlInfoManager == null){
            init();
        }

        List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
        urlInfoList.parallelStream().forEach(urlInfo -> sourceContext.collect(urlInfo));
    }

    @Override
    public void cancel() {
        log.info("------cancel ");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

说明:

a、ApplicationContextAware 接口,表示可以取得applicationContext,避免在多线程的情况下多次初始化spring。

b、这里在open方法中调用初始化spring容器的方法

c、urlInfoManager就直接通过spring管理了,具体的实现放在下文

4、关于RichSourceFunction

RichSourceFunction的继承关系如下:

public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT>{}

注意AbstractRichFunction,这个类是RichSourceFunction、RichSinkFunction的父类,也就是Flink中自定义的DataSource、Sink都是来源于这个类。

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    ......

    public void open(Configuration parameters) throws Exception {
    }

    public void close() throws Exception {
    }
}

这里主要封装了open、close方法,用于初始化数据源链接、关闭数据源链接。

对于SourceFunction,看下其中实现

package org.apache.flink.streaming.api.functions.source;

import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.watermark.Watermark;

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceFunction.SourceContext<T> var1) throws Exception;

    void cancel();

    @Public
    public interface SourceContext<T> {
        void collect(T var1);

        @PublicEvolving
        void collectWithTimestamp(T var1, long var2);

        @PublicEvolving
        void emitWatermark(Watermark var1);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

这里主要用到的是run方法,里面是主要的数据源的操作实现。

5、UrlInfo mybatis相关的实现

UrlInfo pojo:

package myflink.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class UrlInfo {
    private int id;

    private String url;

    private String hash;
}

UrlInfoRepository:

package myflink.repository;

import myflink.model.UrlInfo;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
public interface UrlInfoRepository {

    UrlInfo selectByPrimaryKey(Integer id);

    UrlInfo selectByUrl(String url);

    int insert(UrlInfo urlInfo);

    List<UrlInfo> queryAll();
}

UrlInfoMapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="myflink.repository.UrlInfoRepository">
    <resultMap id="BaseResultMap" type="myflink.model.UrlInfo">
        <id column="id" property="id" jdbcType="INTEGER"/>
        <result column="url" property="url" jdbcType="VARCHAR"/>
        <result column="hash" property="hash" jdbcType="VARCHAR"/>
    </resultMap>

    <sql id="Base_Column_List">
        `id`,
        `url`,
        `hash`
    </sql>

    <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer">
        select
        <include refid="Base_Column_List"/>
        from url_info
        where id = #{id,jdbcType=INTEGER}
    </select>

    <select id="selectByUrl" resultMap="BaseResultMap" parameterType="java.lang.String">
        select
        <include refid="Base_Column_List"/>
        from url_info
        where url = #{url,jdbcType=VARCHAR}
    </select>

    <select id="queryAll" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List"/>
        from url_info
    </select>

    <insert id="insert" parameterType="myflink.model.UrlInfo">
        insert into url_info
        <trim prefix="(" suffix=")" suffixOverrides=",">
            <if test="url != null">
                `url`,
            </if>
            <if test="hash != null">
                `hash`
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides=",">
            <if test="url != null">
                #{url,jdbcType=VARCHAR},
            </if>
            <if test="hash != null">
                #{hash,jdbcType=VARCHAR}
            </if>
        </trim>
    </insert>
</mapper>

UrlInfoManager:

package myflink.manager;

import myflink.model.UrlInfo;

import java.util.List;

public interface UrlInfoManager {

    int insert(UrlInfo urlInfo);

    List<UrlInfo> queryAll();
}

UrlInfoManagerImpl:

package myflink.manager.impl;

import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import myflink.repository.UrlInfoRepository;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Transactional
@Component("urlInfoManager")
public class UrlInfoManagerImpl implements UrlInfoManager {

    @Autowired
    private UrlInfoRepository urlInfoRepository;

    @Override
    public int insert(UrlInfo urlInfo) {

        urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));

        UrlInfo info = urlInfoRepository.selectByUrl(urlInfo.getUrl());
        if(null != info)
        {
            return 0;
        }

        return urlInfoRepository.insert(urlInfo);
    }

    @Override
    public List<UrlInfo> queryAll() {
        return urlInfoRepository.queryAll();
    }
}

相关文章

网友评论

    本文标题:flink学习之三--引入spring

    本文链接:https://www.haomeiwen.com/subject/zeonuqtx.html