前言
在 《从0到1学习Flink》—— Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇文章更详细的介绍下,并写一个 demo 出来让大家理解。
我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。
运行启动 Flink、Zookepeer、Kafka,
好了,都启动了!
<!--flink java-->
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
provided
<!--日志-->
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
runtime
<!--flink kafka connector-->
org.apache.flink
flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}
<!--alibaba fastjson-->
com.alibaba
fastjson
1.2.51
实体类,Metric.java
packagecom.zhisheng.flink.model;
importjava.util.Map;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassMetric{
publicString name;
publiclongtimestamp;
publicMap fields;
publicMap tags;
publicMetric(){
}
publicMetric(String name,longtimestamp, Map fields, Map tags){
this.name = name;
this.timestamp = timestamp;
this.fields = fields;
this.tags = tags;
}
@Override
publicStringtoString(){
return"Metric{"+
"name='"+ name +'\''+
", timestamp='"+ timestamp +'\''+
", fields="+ fields +
", tags="+ tags +
'}';
}
publicStringgetName(){
returnname;
}
publicvoidsetName(String name){
this.name = name;
}
publiclonggetTimestamp(){
returntimestamp;
}
publicvoidsetTimestamp(longtimestamp){
this.timestamp = timestamp;
}
publicMapgetFields(){
returnfields;
}
publicvoidsetFields(Map<String, Object> fields){
this.fields = fields;
}
publicMapgetTags(){
returntags;
}
publicvoidsetTags(Map<String, String> tags){
this.tags = tags;
}
}
往 kafka 中写数据工具类:KafkaUtils.java
importcom.alibaba.fastjson.JSON;
importcom.zhisheng.flink.model.Metric;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.ProducerRecord;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.Properties;
/**
* 往kafka中写数据
* 可以使用这个main函数进行测试一下
* weixin: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassKafkaUtils{
publicstaticfinalString broker_list ="localhost:9092";
publicstaticfinalString topic ="metric";// kafka topic,Flink 程序中需要和这个统一
publicstaticvoidwriteToKafka()throwsInterruptedException{
Properties props =newProperties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//key 序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//value 序列化
KafkaProducer producer =newKafkaProducer(props);
Metric metric =newMetric();
metric.setTimestamp(System.currentTimeMillis());
metric.setName("mem");
Map tags =newHashMap<>();
Map fields =newHashMap<>();
tags.put("cluster","zhisheng");
tags.put("host_ip","101.147.022.106");
fields.put("used_percent",90d);
fields.put("max",27244873d);
fields.put("used",17244873d);
fields.put("init",27244873d);
metric.setTags(tags);
metric.setFields(fields);
ProducerRecord record =newProducerRecord(topic,null,null, JSON.toJSONString(metric));
producer.send(record);
System.out.println("发送数据: "+ JSON.toJSONString(metric));
producer.flush();
}
publicstaticvoidmain(String[] args)throwsInterruptedException{
while(true) {
Thread.sleep(300);
writeToKafka();
}
}
}
运行:
如果出现如上图标记的,即代表能够不断的往 kafka 发送数据的。
Main.java
packagecom.zhisheng.flink;
importorg.apache.flink.api.common.serialization.SimpleStringSchema;
importorg.apache.flink.streaming.api.datastream.DataStreamSource;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
importjava.util.Properties;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassMain{
publicstaticvoidmain(String[] args)throwsException{
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("zookeeper.connect","localhost:2181");
props.put("group.id","metric-group");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key 反序列化
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","latest");//value 反序列化
DataStreamSource dataStreamSource = env.addSource(newFlinkKafkaConsumer011<>(
"metric",//kafka topic
newSimpleStringSchema(),// String 序列化
props)).setParallelism(1);
dataStreamSource.print();//把从 kafka 读取到的数据打印在控制台
env.execute("Flink add data source");
}
}
运行起来:
看到没程序,Flink 程序控制台能够源源不断的打印数据呢。
上面就是 Flink 自带的 Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。
首先 pom.xml 中添加 MySQL 依赖:
mysql
mysql-connector-java
5.1.34
数据库建表如下:
DROPTABLEIFEXISTS`student`;
CREATETABLE`student`(
`id`int(11)unsignedNOTNULLAUTO_INCREMENT,
`name`varchar(25)COLLATEutf8_binDEFAULTNULL,
`password`varchar(25)COLLATEutf8_binDEFAULTNULL,
`age`int(10)DEFAULTNULL,
PRIMARYKEY(`id`)
)ENGINE=InnoDBAUTO_INCREMENT=5DEFAULTCHARSET=utf8COLLATE=utf8_bin;
插入数据:
INSERTINTO`student`VALUES('1','zhisheng01','123456','18'), ('2','zhisheng02','123','17'), ('3','zhisheng03','1234','18'), ('4','zhisheng04','12345','16');
COMMIT;
新建实体类:Student.java
packagecom.zhisheng.flink.model;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassStudent{
publicintid;
publicString name;
publicString password;
publicintage;
publicStudent(){
}
publicStudent(intid, String name, String password,intage){
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
publicStringtoString(){
return"Student{"+
"id="+ id +
", name='"+ name +'\''+
", password='"+ password +'\''+
", age="+ age +
'}';
}
publicintgetId(){
returnid;
}
publicvoidsetId(intid){
this.id = id;
}
publicStringgetName(){
returnname;
}
publicvoidsetName(String name){
this.name = name;
}
publicStringgetPassword(){
returnpassword;
}
publicvoidsetPassword(String password){
this.password = password;
}
publicintgetAge(){
returnage;
}
publicvoidsetAge(intage){
this.age = age;
}
}
新建 Source 类 SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法:
packagecom.zhisheng.flink.source;
importcom.zhisheng.flink.model.Student;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;
importjava.sql.Connection;
importjava.sql.DriverManager;
importjava.sql.PreparedStatement;
importjava.sql.ResultSet;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassSourceFromMySQLextendsRichSourceFunction{
PreparedStatement ps;
privateConnection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
*
*@paramparameters
*@throwsException
*/
@Override
publicvoidopen(Configuration parameters)throwsException{
super.open(parameters);
connection = getConnection();
String sql ="select * from Student;";
ps =this.connection.prepareStatement(sql);
}
/**
* 程序执行完毕就可以进行,关闭连接和释放资源的动作了
*
*@throwsException
*/
@Override
publicvoidclose()throwsException{
super.close();
if(connection !=null) {//关闭连接和释放资源
connection.close();
}
if(ps !=null) {
ps.close();
}
}
/**
* DataStream 调用一次 run() 方法用来获取数据
*
*@paramctx
*@throwsException
*/
@Override
publicvoidrun(SourceContext<Student> ctx)throwsException{
ResultSet resultSet = ps.executeQuery();
while(resultSet.next()) {
Student student =newStudent(
resultSet.getInt("id"),
resultSet.getString("name").trim(),
resultSet.getString("password").trim(),
resultSet.getInt("age"));
ctx.collect(student);
}
}
@Override
publicvoidcancel(){
}
privatestaticConnectiongetConnection(){
Connection con =null;
try{
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","root123456");
}catch(Exception e) {
System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
}
returncon;
}
}
Flink 程序:
packagecom.zhisheng.flink;
importcom.zhisheng.flink.source.SourceFromMySQL;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
publicclassMain2{
publicstaticvoidmain(String[] args)throwsException{
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(newSourceFromMySQL()).print();
env.execute("Flink add data sourc");
}
}
运行 Flink 程序,控制台日志中可以看见打印的 student 信息。
从上面自定义的 Source 可以看到我们继承的就是这个 RichSourceFunction 类,那么来了解一下:
一个抽象类,继承自 AbstractRichFunction。为实现一个 Rich SourceFunction 提供基础能力。该类的子类有三个,两个是抽象类,在此基础上提供了更具体的实现,另一个是 ContinuousFileMonitoringFunction。
MessageAcknowledgingSourceBase :它针对的是数据源是消息队列的场景并且提供了基于 ID 的应答机制。
MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基础上针对 ID 应答机制进行了更为细分的处理,支持两种 ID 应答模型:session id 和 unique message id。
ContinuousFileMonitoringFunction:这是单个(非并行)监视任务,它接受 FileInputFormat,并且根据 FileProcessingMode 和 FilePathFilter,它负责监视用户提供的路径;决定应该进一步读取和处理哪些文件;创建与这些文件对应的 FileInputSplit 拆分,将它们分配给下游任务以进行进一步处理。
本文主要讲了下 Flink 使用 Kafka Source 的使用,并提供了一个 demo 教大家如何自定义 Source,从 MySQL 中读取数据,当然你也可以从其他地方读取,实现自己的数据源 source。可能平时工作会比这个更复杂,需要大家灵活应对!
转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/
微信公众号:zhisheng
网友评论