美文网首页Blink+Flink
《从0到1学习Flink》—— 如何自定义 Data Sourc

《从0到1学习Flink》—— 如何自定义 Data Sourc

作者: 生活的探路者 | 来源:发表于2019-02-20 18:10 被阅读2次

    前言

    在 《从0到1学习Flink》—— Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇文章更详细的介绍下,并写一个 demo 出来让大家理解。

    Flink Kafka source

    准备工作

    我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。

    运行启动 Flink、Zookepeer、Kafka,

    好了,都启动了!

    maven 依赖

    <!--flink java-->

    org.apache.flink

    flink-java

    ${flink.version}

    provided

    org.apache.flink

    flink-streaming-java_${scala.binary.version}

    ${flink.version}

    provided

    <!--日志-->

    org.slf4j

    slf4j-log4j12

    1.7.7

    runtime

    log4j

    log4j

    1.2.17

    runtime

    <!--flink kafka connector-->

    org.apache.flink

    flink-connector-kafka-0.11_${scala.binary.version}

    ${flink.version}

    <!--alibaba fastjson-->

    com.alibaba

    fastjson

    1.2.51

    测试发送数据到 kafka topic

    实体类,Metric.java

    packagecom.zhisheng.flink.model;

    importjava.util.Map;

    /**

    * Desc:

    * weixi: zhisheng_tian

    * blog: http://www.54tianzhisheng.cn/

    */

    publicclassMetric{

    publicString name;

    publiclongtimestamp;

    publicMap fields;

    publicMap tags;

    publicMetric(){

        }

    publicMetric(String name,longtimestamp, Map fields, Map tags){

    this.name = name;

    this.timestamp = timestamp;

    this.fields = fields;

    this.tags = tags;

        }

    @Override

    publicStringtoString(){

    return"Metric{"+

    "name='"+ name +'\''+

    ", timestamp='"+ timestamp +'\''+

    ", fields="+ fields +

    ", tags="+ tags +

    '}';

        }

    publicStringgetName(){

    returnname;

        }

    publicvoidsetName(String name){

    this.name = name;

        }

    publiclonggetTimestamp(){

    returntimestamp;

        }

    publicvoidsetTimestamp(longtimestamp){

    this.timestamp = timestamp;

        }

    publicMapgetFields(){

    returnfields;

        }

    publicvoidsetFields(Map<String, Object> fields){

    this.fields = fields;

        }

    publicMapgetTags(){

    returntags;

        }

    publicvoidsetTags(Map<String, String> tags){

    this.tags = tags;

        }

    }

    往 kafka 中写数据工具类:KafkaUtils.java

    importcom.alibaba.fastjson.JSON;

    importcom.zhisheng.flink.model.Metric;

    importorg.apache.kafka.clients.producer.KafkaProducer;

    importorg.apache.kafka.clients.producer.ProducerRecord;

    importjava.util.HashMap;

    importjava.util.Map;

    importjava.util.Properties;

    /**

    * 往kafka中写数据

    * 可以使用这个main函数进行测试一下

    * weixin: zhisheng_tian

    * blog: http://www.54tianzhisheng.cn/

    */

    publicclassKafkaUtils{

    publicstaticfinalString broker_list ="localhost:9092";

    publicstaticfinalString topic ="metric";// kafka topic,Flink 程序中需要和这个统一

    publicstaticvoidwriteToKafka()throwsInterruptedException{

    Properties props =newProperties();

    props.put("bootstrap.servers", broker_list);

    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//key 序列化

    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//value 序列化

    KafkaProducer producer =newKafkaProducer(props);

    Metric metric =newMetric();

            metric.setTimestamp(System.currentTimeMillis());

    metric.setName("mem");

    Map tags =newHashMap<>();

    Map fields =newHashMap<>();

    tags.put("cluster","zhisheng");

    tags.put("host_ip","101.147.022.106");

    fields.put("used_percent",90d);

    fields.put("max",27244873d);

    fields.put("used",17244873d);

    fields.put("init",27244873d);

            metric.setTags(tags);

            metric.setFields(fields);

    ProducerRecord record =newProducerRecord(topic,null,null, JSON.toJSONString(metric));

            producer.send(record);

    System.out.println("发送数据: "+ JSON.toJSONString(metric));

            producer.flush();

        }

    publicstaticvoidmain(String[] args)throwsInterruptedException{

    while(true) {

    Thread.sleep(300);

                writeToKafka();

            }

        }

    }

    运行:

    如果出现如上图标记的,即代表能够不断的往 kafka 发送数据的。

    Flink 程序

    Main.java

    packagecom.zhisheng.flink;

    importorg.apache.flink.api.common.serialization.SimpleStringSchema;

    importorg.apache.flink.streaming.api.datastream.DataStreamSource;

    importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

    importjava.util.Properties;

    /**

    * Desc:

    * weixi: zhisheng_tian

    * blog: http://www.54tianzhisheng.cn/

    */

    publicclassMain{

    publicstaticvoidmain(String[] args)throwsException{

    finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props =newProperties();

    props.put("bootstrap.servers","localhost:9092");

    props.put("zookeeper.connect","localhost:2181");

    props.put("group.id","metric-group");

    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key 反序列化

    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    props.put("auto.offset.reset","latest");//value 反序列化

    DataStreamSource dataStreamSource = env.addSource(newFlinkKafkaConsumer011<>(

    "metric",//kafka topic

    newSimpleStringSchema(),// String 序列化

    props)).setParallelism(1);

    dataStreamSource.print();//把从 kafka 读取到的数据打印在控制台

    env.execute("Flink add data source");

        }

    }

    运行起来:

    看到没程序,Flink 程序控制台能够源源不断的打印数据呢。

    自定义 Source

    上面就是 Flink 自带的 Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。

    首先 pom.xml 中添加 MySQL 依赖

    mysql

    mysql-connector-java

    5.1.34

    数据库建表如下:

    DROPTABLEIFEXISTS`student`;

    CREATETABLE`student`(

    `id`int(11)unsignedNOTNULLAUTO_INCREMENT,

    `name`varchar(25)COLLATEutf8_binDEFAULTNULL,

    `password`varchar(25)COLLATEutf8_binDEFAULTNULL,

    `age`int(10)DEFAULTNULL,

    PRIMARYKEY(`id`)

    )ENGINE=InnoDBAUTO_INCREMENT=5DEFAULTCHARSET=utf8COLLATE=utf8_bin;

    插入数据

    INSERTINTO`student`VALUES('1','zhisheng01','123456','18'), ('2','zhisheng02','123','17'), ('3','zhisheng03','1234','18'), ('4','zhisheng04','12345','16');

    COMMIT;

    新建实体类:Student.java

    packagecom.zhisheng.flink.model;

    /**

    * Desc:

    * weixi: zhisheng_tian

    * blog: http://www.54tianzhisheng.cn/

    */

    publicclassStudent{

    publicintid;

    publicString name;

    publicString password;

    publicintage;

    publicStudent(){

        }

    publicStudent(intid, String name, String password,intage){

    this.id = id;

    this.name = name;

    this.password = password;

    this.age = age;

        }

    @Override

    publicStringtoString(){

    return"Student{"+

    "id="+ id +

    ", name='"+ name +'\''+

    ", password='"+ password +'\''+

    ", age="+ age +

    '}';

        }

    publicintgetId(){

    returnid;

        }

    publicvoidsetId(intid){

    this.id = id;

        }

    publicStringgetName(){

    returnname;

        }

    publicvoidsetName(String name){

    this.name = name;

        }

    publicStringgetPassword(){

    returnpassword;

        }

    publicvoidsetPassword(String password){

    this.password = password;

        }

    publicintgetAge(){

    returnage;

        }

    publicvoidsetAge(intage){

    this.age = age;

        }

    }

    新建 Source 类 SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法:

    packagecom.zhisheng.flink.source;

    importcom.zhisheng.flink.model.Student;

    importorg.apache.flink.configuration.Configuration;

    importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;

    importjava.sql.Connection;

    importjava.sql.DriverManager;

    importjava.sql.PreparedStatement;

    importjava.sql.ResultSet;

    /**

    * Desc:

    * weixi: zhisheng_tian

    * blog: http://www.54tianzhisheng.cn/

    */

    publicclassSourceFromMySQLextendsRichSourceFunction{

        PreparedStatement ps;

    privateConnection connection;

    /**

        * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。

        *

    *@paramparameters

    *@throwsException

        */

    @Override

    publicvoidopen(Configuration parameters)throwsException{

    super.open(parameters);

            connection = getConnection();

    String sql ="select * from Student;";

    ps =this.connection.prepareStatement(sql);

        }

    /**

        * 程序执行完毕就可以进行,关闭连接和释放资源的动作了

        *

    *@throwsException

        */

    @Override

    publicvoidclose()throwsException{

    super.close();

    if(connection !=null) {//关闭连接和释放资源

                connection.close();

            }

    if(ps !=null) {

                ps.close();

            }

        }

    /**

        * DataStream 调用一次 run() 方法用来获取数据

        *

    *@paramctx

    *@throwsException

        */

    @Override

    publicvoidrun(SourceContext<Student> ctx)throwsException{

            ResultSet resultSet = ps.executeQuery();

    while(resultSet.next()) {

    Student student =newStudent(

    resultSet.getInt("id"),

    resultSet.getString("name").trim(),

    resultSet.getString("password").trim(),

    resultSet.getInt("age"));

                ctx.collect(student);

            }

        }

    @Override

    publicvoidcancel(){

        }

    privatestaticConnectiongetConnection(){

    Connection con =null;

    try{

    Class.forName("com.mysql.jdbc.Driver");

    con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","root123456");

    }catch(Exception e) {

    System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());

                }

    returncon;

        }

    }

    Flink 程序

    packagecom.zhisheng.flink;

    importcom.zhisheng.flink.source.SourceFromMySQL;

    importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

    /**

    * Desc:

    * weixi: zhisheng_tian

    * blog: http://www.54tianzhisheng.cn/

    */

    publicclassMain2{

    publicstaticvoidmain(String[] args)throwsException{

    finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(newSourceFromMySQL()).print();

    env.execute("Flink add data sourc");

        }

    }

    运行 Flink 程序,控制台日志中可以看见打印的 student 信息。

    RichSourceFunction

    从上面自定义的 Source 可以看到我们继承的就是这个 RichSourceFunction 类,那么来了解一下:

    一个抽象类,继承自 AbstractRichFunction。为实现一个 Rich SourceFunction 提供基础能力。该类的子类有三个,两个是抽象类,在此基础上提供了更具体的实现,另一个是 ContinuousFileMonitoringFunction。

    MessageAcknowledgingSourceBase :它针对的是数据源是消息队列的场景并且提供了基于 ID 的应答机制。

    MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基础上针对 ID 应答机制进行了更为细分的处理,支持两种 ID 应答模型:session id 和 unique message id。

    ContinuousFileMonitoringFunction:这是单个(非并行)监视任务,它接受 FileInputFormat,并且根据 FileProcessingMode 和 FilePathFilter,它负责监视用户提供的路径;决定应该进一步读取和处理哪些文件;创建与这些文件对应的 FileInputSplit 拆分,将它们分配给下游任务以进行进一步处理。

    最后

    本文主要讲了下 Flink 使用 Kafka Source 的使用,并提供了一个 demo 教大家如何自定义 Source,从 MySQL 中读取数据,当然你也可以从其他地方读取,实现自己的数据源 source。可能平时工作会比这个更复杂,需要大家灵活应对!

    关注我

    转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/

    微信公众号:zhisheng

    相关文章

      网友评论

        本文标题:《从0到1学习Flink》—— 如何自定义 Data Sourc

        本文链接:https://www.haomeiwen.com/subject/awxkyqtx.html