Flink中引入Spring
一个flink项目中可能存在多个job,不过一般每个job都是一个main方法了事,主要逻辑也在这个main中,如果需要用到别的功能,一般都是直接new,一直在业务中使用的spring并没用到。而在flink中可以用spring么?当然,并没有什么限制,只是需要手动初始化spring容器而已。
在这里主要引入spring管理数据库连接池,而不是直接用jdbc connection处理数据库连接,再将数据库作为flink的数据源。
1、引入依赖
spring相关依赖:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
mysql驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
使用druid数据库连接池
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.11</version>
</dependency>
mybatis相关依赖:
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.0</version>
</dependency>
2、配置maven插件
如果这里不设置,在本地跑main方法是没啥问题的,不过打包之后上传到flink集群中,会出现找不到spring相关类的异常,配置如下:
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>myflink.job.KafkaDatasouceForFlinkJob</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
这里需要注意的有:
1、maven-shade-plugin:创建项目的时候指定 Flink Quickstart Job类型,会自动在pom.xml添加这个plugin配置。
2、mainClass:指定整个jar包的入口,不过其实关系不大,在上传到flink上之后,可以在submit job的时候再重新指定。
3、AppendingTransformer,这两个配置是关键,会把spring相关的依赖、配置都打包到jar中。
spring配置
beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"
default-lazy-init="false">
<context:annotation-config/>
<context:component-scan base-package="myflink"/>
<context:component-scan base-package="myflink.ds"/>
<context:component-scan base-package="myflink.repository"/>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<property name="url" value="jdbc:mysql://localhost:3306/log" />
<property name="username" value="root" />
<property name="password" value="root" />
<property name="filters" value="stat" />
<property name="maxActive" value="20" />
<property name="initialSize" value="1" />
<property name="maxWait" value="60000" />
<property name="minIdle" value="1" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="poolPreparedStatements" value="true" />
<property name="maxOpenPreparedStatements" value="20" />
<property name="asyncInit" value="true" />
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="myflink.repository"/>
<property name="sqlSessionFactoryBeanName" value="myBatisSqlSessionFactory"/>
<property name="annotationClass" value="org.springframework.stereotype.Repository"/>
</bean>
<bean id="myBatisSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="mapperLocations" value="classpath*:mapper/*Mapper.xml"/>
</bean>
<bean id="mySqlTransactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>
如上所示,就是一个普通的spring配置,这里配置了annotation扫描的表、数据库连接池、mybatis相关配置、事务等。
代码中处理
1、初步尝试
目前接触到的所有flink job都是从main方法中开始的,本来打算在main方法中初始化applicationContext,并把applicationContext作为参数传入到数据源中,或者直接将dataSource从spring中当做bean拿到,并放入addSource中,如下:
public class MySqlDSPoolForFlinkJob {
public static void main(String[] args) throws Exception {
// 初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
// 获取bean
MysqlDSWithSpringForFlink mysqlDSWithSpringForFlink = (MysqlDSWithSpringForFlink) applicationContext.getBean("mysqlDsWithSpring");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(mysqlDSWithSpringForFlink).addSink(new PrintSinkFunction<>());
env.execute("mysql Datasource with pool and spring");
}
}
但是运行起来的时候,总是如下错:
2019-01-14 21:43:54.729 [main] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSourceFunction is not serializable. The object probably contains or references non serializable fields.
即使做了spring容器的初始化,下层中依然无法获取到applicationContext。
所以在main中,依然只能用new的方式获取对应的bean。
2、在具体的业务逻辑中使用
既然在main方法中无法直接传入bean,那就让main方法只是作为一层简单封装,具体的datasource、sink等操作都放在下层,也就是在具体的DataSource实现中进行spring初始化处理。
修改后 main 方法的实现如下:
public class MySqlDSPoolForFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加flink数据源
env.addSource(new MysqlDSWithSpringForFlink()).addSink(new PrintSinkFunction<>());
env.execute("mysql Datasource with pool and spring");
}
}
关键是MysqlDSWithSpringForFlink中的实现。
3、flink中的dataSource
flink中的数据源需要实现RichSourceFunction<T>抽象类中的方法,这里的实现如下:
package myflink.ds;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.List;
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class MysqlDSWithSpringForFlink extends RichSourceFunction<UrlInfo> implements ApplicationContextAware {
private UrlInfoManager urlInfoManager;
private ApplicationContext applicationContext;
@Override
public void open(Configuration parameters) throws Exception {
log.info("------open connection,applicationContext=" + applicationContext);
super.open(parameters);
if(applicationContext == null){
init();
}
}
private void init(){
// 在这里进行spring的初始化
applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
urlInfoManager = (UrlInfoManager) applicationContext.getBean("urlInfoManager");
}
@Override
public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
log.info("------query ");
if(urlInfoManager == null){
init();
}
List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
urlInfoList.parallelStream().forEach(urlInfo -> sourceContext.collect(urlInfo));
}
@Override
public void cancel() {
log.info("------cancel ");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
说明:
a、ApplicationContextAware 接口,表示可以取得applicationContext,避免在多线程的情况下多次初始化spring。
b、这里在open方法中调用初始化spring容器的方法
c、urlInfoManager就直接通过spring管理了,具体的实现放在下文
4、关于RichSourceFunction
RichSourceFunction的继承关系如下:
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT>{}
注意AbstractRichFunction,这个类是RichSourceFunction、RichSinkFunction的父类,也就是Flink中自定义的DataSource、Sink都是来源于这个类。
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
......
public void open(Configuration parameters) throws Exception {
}
public void close() throws Exception {
}
}
这里主要封装了open、close方法,用于初始化数据源链接、关闭数据源链接。
对于SourceFunction,看下其中实现
package org.apache.flink.streaming.api.functions.source;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.watermark.Watermark;
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceFunction.SourceContext<T> var1) throws Exception;
void cancel();
@Public
public interface SourceContext<T> {
void collect(T var1);
@PublicEvolving
void collectWithTimestamp(T var1, long var2);
@PublicEvolving
void emitWatermark(Watermark var1);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
这里主要用到的是run方法,里面是主要的数据源的操作实现。
5、UrlInfo mybatis相关的实现
UrlInfo pojo:
package myflink.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class UrlInfo {
private int id;
private String url;
private String hash;
}
UrlInfoRepository:
package myflink.repository;
import myflink.model.UrlInfo;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface UrlInfoRepository {
UrlInfo selectByPrimaryKey(Integer id);
UrlInfo selectByUrl(String url);
int insert(UrlInfo urlInfo);
List<UrlInfo> queryAll();
}
UrlInfoMapper.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="myflink.repository.UrlInfoRepository">
<resultMap id="BaseResultMap" type="myflink.model.UrlInfo">
<id column="id" property="id" jdbcType="INTEGER"/>
<result column="url" property="url" jdbcType="VARCHAR"/>
<result column="hash" property="hash" jdbcType="VARCHAR"/>
</resultMap>
<sql id="Base_Column_List">
`id`,
`url`,
`hash`
</sql>
<select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer">
select
<include refid="Base_Column_List"/>
from url_info
where id = #{id,jdbcType=INTEGER}
</select>
<select id="selectByUrl" resultMap="BaseResultMap" parameterType="java.lang.String">
select
<include refid="Base_Column_List"/>
from url_info
where url = #{url,jdbcType=VARCHAR}
</select>
<select id="queryAll" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from url_info
</select>
<insert id="insert" parameterType="myflink.model.UrlInfo">
insert into url_info
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="url != null">
`url`,
</if>
<if test="hash != null">
`hash`
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="url != null">
#{url,jdbcType=VARCHAR},
</if>
<if test="hash != null">
#{hash,jdbcType=VARCHAR}
</if>
</trim>
</insert>
</mapper>
UrlInfoManager:
package myflink.manager;
import myflink.model.UrlInfo;
import java.util.List;
public interface UrlInfoManager {
int insert(UrlInfo urlInfo);
List<UrlInfo> queryAll();
}
UrlInfoManagerImpl:
package myflink.manager.impl;
import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import myflink.repository.UrlInfoRepository;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Transactional
@Component("urlInfoManager")
public class UrlInfoManagerImpl implements UrlInfoManager {
@Autowired
private UrlInfoRepository urlInfoRepository;
@Override
public int insert(UrlInfo urlInfo) {
urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));
UrlInfo info = urlInfoRepository.selectByUrl(urlInfo.getUrl());
if(null != info)
{
return 0;
}
return urlInfoRepository.insert(urlInfo);
}
@Override
public List<UrlInfo> queryAll() {
return urlInfoRepository.queryAll();
}
}
网友评论