前言:
版本:1.13
通用Maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<!--读取hdfs文件-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
KafkaSink
- 导入依赖
<!--读取Kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
- 程序
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从网络中读取数据
* 参数:ip,段口,分隔符,重试次数
*/
SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
DataStreamSource<String> source = env.addSource(socketText);
source.print("kafka>>>");
// 将数据写入Kafka
// 集群地址
String brokerList = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
String topicId = "flink_kafka";
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(brokerList, topicId, new SimpleStringSchema());
kafkaSink.invoke("javaddd");
// 添加sink
source.addSink(kafkaSink);
// 启动执行
env.execute();
}
- 输入
[admin@hadoop102 ~]$ nc -lk 9999
java
wwww
- 输出(消费者)
[admin@hadoop102 bin]$ sh kafka-console-consumer.sh --bootstrap-server hadoop102:9092, hadoop103:9092, hadoop104:9092
--topic flink_kafka
java
wwww
- 官方案例:kafkaSink
RedisSink
- 导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
- 程序
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从网络中读取数据
* 参数:ip,段口,分隔符,重试次数
*/
SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
DataStreamSource<String> source = env.addSource(socketText);
// redis 连接配置
FlinkJedisPoolConfig builder = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop102")
.setPort(6379)
.build();
// redis 命令
RedisMapper<String> redisMapper = new RedisMapper<String>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"key_sss");
}
@Override
public String getKeyFromData(String data) {
return data+"_ByKey";
}
@Override
public String getValueFromData(String data) {
return data;
}
};
org.apache.flink.streaming.connectors.redis.RedisSink<String> stringRedisSink =
new org.apache.flink.streaming.connectors.redis.RedisSink<>(builder,redisMapper);
source.addSink(stringRedisSink);
env.execute();
}
- 输入
java
- 输出
> hget key_sss java_ByKey
java
- 支持的redis命令
public enum RedisCommand {
/**
* Insert the specified value at the head of the list stored at key.
* If key does not exist, it is created as empty list before performing the push operations.
*/
LPUSH(RedisDataType.LIST),
/**
* Insert the specified value at the tail of the list stored at key.
* If key does not exist, it is created as empty list before performing the push operation.
*/
RPUSH(RedisDataType.LIST),
/**
* Add the specified member to the set stored at key.
* Specified member that is already a member of this set is ignored.
*/
SADD(RedisDataType.SET),
/**
* Set key to hold the string value. If key already holds a value,
* it is overwritten, regardless of its type.
*/
SET(RedisDataType.STRING),
/**
* Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
*/
PFADD(RedisDataType.HYPER_LOG_LOG),
/**
* Posts a message to the given channel.
*/
PUBLISH(RedisDataType.PUBSUB),
/**
* Adds the specified members with the specified score to the sorted set stored at key.
*/
ZADD(RedisDataType.SORTED_SET),
/**
* Sets field in the hash stored at key to value. If key does not exist,
* a new key holding a hash is created. If field already exists in the hash, it is overwritten.
*/
HSET(RedisDataType.HASH);
}
-
官方案例:redisSink
-
ElasticsearchSink
-
官方案例:ElasticsearchSink
-
程序
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从网络中读取数据
* 参数:ip,段口,分隔符,重试次数
*/
SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
DataStreamSource<String> source = env.addSource(socketText);
// 将数据解析成Bean
SingleOutputStreamOperator<UserBean> map = source.map(e -> new UserBean(e.split(",")))
.returns(Types.POJO(UserBean.class));
// es 连接
List<HttpHost> httpHosts=new ArrayList<>();
httpHosts.add(new HttpHost("hadoop102",9200,"http"));
httpHosts.add(new HttpHost("hadoop103",9200,"http"));
httpHosts.add(new HttpHost("hadoop104",9200,"http"));
// es 写入数据
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder builder = new org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder(
httpHosts,
new ElasticsearchSinkFunction<UserBean>() {
public IndexRequest createIndexRequest(UserBean element) throws IllegalAccessException {
return Requests.indexRequest()
.index("user_index")
.type("_doc")
.source(objectToMap(element));
}
@SneakyThrows
@Override
public void process(UserBean element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// 指定攒批策略
builder.setBulkFlushInterval(1000);
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink sink = builder.build();
map.addSink(sink);
env.execute();
}
/**
* 将Object对象里面的属性和值转化成Map对象
*
* @param obj
* @return
* @throws IllegalAccessException
*/
public static Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
Map<String, Object> map = new HashMap<>();
Class<?> clazz = obj.getClass();
for (Field field : clazz.getDeclaredFields()) {
field.setAccessible(true);
String fieldName = field.getName();
Object value = field.get(obj);
map.put(fieldName, value);
}
return map;
}
- UserBean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author admin
* @date 2021/8/10
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class UserBean {
private int id;
private String name;
private int age;
private String sex;
public UserBean(String[] values) {
this.id = Integer.parseInt(values[0]);
this.name = values[1];
this.age = Integer.parseInt(values[2]);
this.sex = values[3];
}
}
-
Requests
indexRequest
:新增:(id 存储则覆盖,不存在则插入)
deleteRequest
:删除
bulkRequest
:批操作 -
BulkFlush 刷新策略
es 为了提高性能,并不会立即进行保存,每一个批次数据量达到一定量之后再刷新到数据库中。
setBulkFlushMaxActions
:为每个批量请求设置要缓冲的最大操作数。您可以通过 -1 来禁用它。
setBulkFlushMaxSizeMb
:设置每个批量请求的缓冲操作的最大大小(以 mb 为单位)。您可以通过 -1 来禁用它
setBulkFlushInterval
:设置批量刷新间隔,以毫秒为单位。您可以通过 -1 来禁用它。
setBulkFlushBackoff
:设置是否启用批量刷新退避行为。
setBulkFlushBackoffType
:设置刷新批量请求时要使用的返回类型。
setBulkFlushBackoffRetries
:设置刷新批量请求时退避尝试的最大重试次数。
setBulkFlushBackoffDelay
:设置刷新批量请求时每次退避尝试之间的延迟量,以毫秒为单位。
- 输入
[admin@hadoop102 ~]$ nc -lk 9999
1,zhangsan,18,M
2,lisi,19,F
- es
GET /user_index/_search
{
"query": {
"match_all": {}
}
}
-输出
{
"took" : 16,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "user_index",
"_type" : "_doc",
"_id" : "AafpL3sBamyuoJRi0TBG",
"_score" : 1.0,
"_source" : {
"sex" : "F",
"name" : "lisi",
"id" : 2,
"age" : 19
}
},
{
"_index" : "user_index",
"_type" : "_doc",
"_id" : "AKfpL3sBamyuoJRisjCU",
"_score" : 1.0,
"_source" : {
"sex" : "M",
"name" : "zhangsan",
"id" : 1,
"age" : 18
}
}
]
}
}
- 异常处理
异常信息:
The number of object passed must be even but was [1]
异常产生原因:参考Elasticsearch报异常:The number of object passed must be even but was [1]解决过程
return Requests.indexRequest()
.index("user_index")
.type("_doc")
.source(objectToMap(element));
.source 数据类型必须为k-v
类型,最开始我是直接将element
传递到source()中,所以出现了上面的错误。
解决方案:将 Bean
转换成Map
,这样就满足了k-v
结构类型。
/**
* 将Object对象里面的属性和值转化成Map对象
*
* @param obj
* @return
* @throws IllegalAccessException
*/
public static Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
Map<String, Object> map = new HashMap<>();
Class<?> clazz = obj.getClass();
for (Field field : clazz.getDeclaredFields()) {
field.setAccessible(true);
String fieldName = field.getName();
Object value = field.get(obj);
map.put(fieldName, value);
}
return map;
}
自定义Sink
所有的sink类实现需要实现org.apache.flink.streaming.api.functions.sink.SinkFunction
接口,实现里面的invoke
方法。
public interface SinkFunction<IN> extends Function, Serializable {
- 准备一张数据表
CREATE table t_user(
id int not null primary key,
name varchar(20) not null,
age int not null ,
sex varchar(20) not null
);
- 导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
- 方式一: 自定义sink 将数据写入到mysql
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @author admin
* @date 2021/8/10
*/
public class CustomSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从网络中读取数据
* 参数:ip,段口,分隔符,重试次数
*/
SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
DataStreamSource<String> source = env.addSource(socketText);
// 将数据转换成对对象
SingleOutputStreamOperator<UserBean> map = source.map(e -> new UserBean(e.split(",")));
map.addSink(new MySink());
map.print();
env.execute();
}
private static class MySink implements SinkFunction<UserBean>{
// 加载驱动类
private static final String URL = "jdbc:mysql://hadoop102:3306/demo?user=root&password=123321";
private static final String INSERT_URL="insert into t_user values(?,?,?,?)";
@Override
public void invoke(UserBean value, Context context) throws Exception {
Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
Connection conn =DriverManager.getConnection(URL); // 获取数据库连接
PreparedStatement statement = conn.prepareStatement(INSERT_URL);
statement.setInt(1,value.getId());
statement.setString(2,value.getName());
statement.setInt(3,value.getAge());
statement.setString(4,value.getSex());
statement.execute();
statement.close();
conn.close();
}
}
}
- 输入
[admin@hadoop102 ~]$ nc -lk 9999
1,zhangsan,18,M
2,lisi,19,F
- 查询数据库
mysql> select * from t_user;
+----+----------+-----+-----+
| id | name | age | sex |
+----+----------+-----+-----+
| 1 | zhangsan | 18 | M |
| 2 | lisi | 19 | F |
+----+----------+-----+-----+
2 rows in set (0.00 sec)
- 方式二: 自定义sink 将数据写入到mysql
方式一明显是有问题的,每次调用
invoke
都会创建一次连接,这样是很不合理的,所以需要进行优化。
使用org.apache.flink.streaming.api.functions.sink.RichSinkFunction
,配合生命周期使用,创建数据库连接。
private static class MySink2 extends RichSinkFunction<UserBean> {
// 加载驱动类
private static final String URL = "jdbc:mysql://hadoop102:3306/demo?user=root&password=123321";
private static final String INSERT_URL="insert into t_user values(?,?,?,?)";
private Connection conn;
private PreparedStatement statement;
/**
* 程序启动是调用
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
conn = DriverManager.getConnection(URL); // 获取数据库连接
statement = conn.prepareStatement(INSERT_URL);
}
@Override
public void invoke(UserBean value, Context context) throws Exception {
statement.setInt(1,value.getId());
statement.setString(2,value.getName());
statement.setInt(3,value.getAge());
statement.setString(4,value.getSex());
statement.execute();
}
/**
* 程序关闭调用
* @throws Exception
*/
@Override
public void close() throws Exception {
if(statement!=null){
statement.close();
}
if (conn != null) {
conn.close();
}
}
}
open
与close
调用是与sloat
并发度有关。并行度有多少就调用多少次。
最终使用MySink2
map.addSink(new MySink2());
- 测试
[admin@hadoop102 ~]$ nc -lk 9999
4,lifeifei,26,F
3,wangfenyu,47,M
- mysql 查询,加上上面插入的2条共4条
mysql> select * from t_user;
+----+-----------+-----+-----+
| id | name | age | sex |
+----+-----------+-----+-----+
| 1 | zhangsan | 18 | M |
| 2 | lisi | 19 | F |
| 3 | wangfenyu | 47 | M |
| 4 | lifeifei | 26 | F |
+----+-----------+-----+-----+
4 rows in set (0.00 sec)
官方的MysqlSink
虽然使用
MySink2
的mysql操作方式,减少连接次数,但是依然还有很多问题,比如flink
是一直不间断的执行的,mysql也不可能会一直连接着(mysql服务会回收长时间连接的服务),若服务器的连接断了,没有进行重新连接,这样数据就无法保存到mysql中。当时这些问题都不用去考虑,因为有fink官网都给我们准备好了。
-
官网JDBC操作
-
导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.0</version>
</dependency>
- 程序
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
/**
* @author admin
* @date 2021/8/10
*/
public class MysqlSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 从网络中读取数据
* 参数:ip,段口,分隔符,重试次数
*/
SocketTextStreamFunction socketText = new SocketTextStreamFunction("hadoop102", 9999, "\n", 3);
DataStreamSource<String> source = env.addSource(socketText);
// 将数据转换成对对象
SingleOutputStreamOperator<UserBean> map = source.map(e -> new UserBean(e.split(",")));
// 执行插入操作
JdbcStatementBuilder<UserBean> builder = (JdbcStatementBuilder<UserBean>) (statement, userBean) -> {
statement.setInt(1, userBean.getId());
statement.setString(2, userBean.getName());
statement.setInt(3, userBean.getAge());
statement.setString(4, userBean.getSex());
};
// 执行配置
JdbcExecutionOptions executionOptions = new JdbcExecutionOptions.Builder()
.withBatchSize(1) // 测试,所以设置为1
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build();
// 连接配置
JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions
.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop102:3306/demo?useSSL=false") // url
.withDriverName("com.mysql.jdbc.Driver") // driver
.withUsername("root") // 账号
.withPassword("123321") // 密码
.withConnectionCheckTimeoutSeconds(30) // 连接超时时间 单位:秒
.build();
// 插入的sql语句
String insertSql = "insert into t_user(id,name,age,sex) values(?,?,?,?)";
SinkFunction<UserBean> sink = JdbcSink.sink(insertSql,builder,executionOptions,connectionOptions);
map.print();
map.addSink(sink);
env.execute();
}
}
网友评论