美文网首页
Flink学习笔记(3):Sink to JDBC

Flink学习笔记(3):Sink to JDBC

作者: 郭寻抚 | 来源:发表于2016-10-05 22:18 被阅读8142次

    1. 前言

    1.1 说明

    本文通过一个Demo程序,演示Flink从Kafka中读取数据,并将数据以JDBC的方式持久化到关系型数据库中。通过本文,可以学习如何自定义Flink Sink和Flink Steaming编程的步骤。

    1.2 软件版本

    • Centos 7.1
    • JDK 1.8
    • Flink 1.1.2
    • Kafka 0.10.0.1

    1.3 依赖jar包

    请将以下依赖放在pom.xml中。这里使用的关系型数据是PostgreSQL,也可以换成其它关系型数据库的驱动程序。

     <properties>
        <flink.version>1.1.2</flink.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.1-901-1.jdbc4</version>
        </dependency>
    </dependencies>
    

    2. 自定义Sink

    2.1 内置的Streaming Connector

    Flink 内置了一些Streaming Connector,用于和第三方的系统交互。截至到当前为止,Flink支持以下Connector。括号中的source代表数据从这些第三方系统中流入Flink中,sink代表数据从Flink流到这些第三方系统中。

    • Apache Kafka (sink/source)
    • Elasticsearch (sink)
    • Elasticsearch 2x (sink)
    • Hadoop FileSystem (sink)
    • RabbitMQ (sink/source)
    • Amazon Kinesis Streams (sink/source)
    • Twitter Streaming API (source)
    • Apache NiFi (sink/source)
    • Apache Cassandra (sink)
    • Redis (sink)

    除此之外,Flink还允许我们自定义source和sink。本文所述例子是从Kafka中读取数据,并把数据写入数据库中;由于Flink已经内置了Kafka source,因此还需要自定义JDBC sink。

    2.2 自定义JDBC sink

    下面的代码就是一个JDBC sink的实现,其效果就是向PostgreSQL数据库中插入数据,具体请看代码中的注释说明。

    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    
    public class PostgreSQLSink extends RichSinkFunction<Tuple3<String,String,String>> {
    
        private static final long serialVersionUID = 1L;
    
        private Connection connection;
        private PreparedStatement preparedStatement;
        /**
         * open方法是初始化方法,会在invoke方法之前执行,执行一次。
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            // JDBC连接信息
            String USERNAME = "postgres" ;
            String PASSWORD = "********";
            String DRIVERNAME = "org.postgresql.Driver";
            String DBURL = "jdbc:postgresql://192.168.1.213/flink";
            // 加载JDBC驱动
            Class.forName(DRIVERNAME);
            // 获取数据库连接
            connection = DriverManager.getConnection(DBURL,USERNAME,PASSWORD);
            String sql = "insert into kafka_message(
                            timeseq, thread, message) values (?,?,?)";
            preparedStatement = connection.prepareStatement(sql);
            super.open(parameters);
        }
    
        /**
         * invoke()方法解析一个元组数据,并插入到数据库中。
         * @param data 输入的数据
         * @throws Exception
         */
        @Override
        public  void invoke(Tuple3<String,String,String> data) throws Exception{
            try {
                String timeseq = data.getField(0);
                String thread = data.getField(1);
                String message = data.getField(2);
                preparedStatement.setString(1,timeseq);
                preparedStatement.setString(2,thread);
                preparedStatement.setString(3,message);
                preparedStatement.executeUpdate();
            }catch (Exception e){
                e.printStackTrace();
            }
    
        };
    
        /**
         * close()是tear down的方法,在销毁时执行,关闭连接。
         */
        @Override
        public void close() throws Exception {
            if(preparedStatement != null){
                preparedStatement.close();
            }
            if(connection != null){
                connection.close();
            }
            super.close();
        }
    }
    

    3. Flink Streaming Job 编程

    3.1 Flink Stream编程的步骤

    Flink job 编程基本上都是由一些基本部分组成:

    1. 获得一个 execution environment
    2. 加载/创建初始数据(Source)
    3. 指定在该数据上进行的转换(Transformations)
    4. 指定计算结果的存储地方(Sink)
    5. 启动程序执行。

    3.2 Kafka-Flink-DB

    下面的代码,是一个Flink Job,从Kafka中读取消息,并把消息写到关系型数据库中。

    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    
    public class KafkaToDB {
    
        public static void main(String[] args) throws Exception {
            // 解析参数
            final ParameterTool parameterTool = ParameterTool.fromArgs(args);
            if (parameterTool.getNumberOfParameters() < 4) {
                System.out.println("Missing parameters!");
                System.out.println("\nUsage: Kafka --topic <topic> " +
                        "--bootstrap.servers <kafka brokers> "+
                        "--zookeeper.connect <zk quorum> --group.id <some id>");
                return;
            }
    
            // 获取StreamExecutionEnvironment。
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().disableSysoutLogging();
            env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
            // create a checkpoint every 5 secodns
            env.enableCheckpointing(5000); 
            // make parameters available in the web interface
            env.getConfig().setGlobalJobParameters(parameterTool); 
    
            // source
            DataStream<String> sourceStream = env.addSource(
                    new FlinkKafkaConsumer08<String>(parameterTool.getRequired("topic"),
                            new SimpleStringSchema(), parameterTool.getProperties()));
            // Transformation,这里仅仅是过滤了null。
            DataStream<Tuple3<String, String, String>> messageStream = sourceStream
                    .map(new InputMap())
                    .filter(new NullFilter());
            //sink
            messageStream.addSink(new PostgreSQLSink());
    
            env.execute("Write into PostgreSQL");
        }
        
        // 过滤Null数据。
        public static class NullFilter implements FilterFunction<Tuple3<String, String, String>>{
            @Override
            public boolean filter(Tuple3<String, String, String> value) throws Exception {
                return value != null;
            }
        }
        
        // 对输入数据做map操作。
        public static class InputMap implements MapFunction<String, Tuple3<String, String, String>> {
            private static final long serialVersionUID = 1L;
    
            @Override
            public Tuple3<String, String, String> map(String line) throws Exception {
                // normalize and split the line
                String[] arr = line.toLowerCase().split(",");
                if (arr.length > 2) {
                    return new Tuple3<>(arr[0], arr[1], arr[2]);
                }
                return null;
            }
        }
    
    }
    

    4. 把Job提交Flink集群

    将上面的代码打包成jar后,通过下面的命令把job提交到Flink集群上。其中-c指定了flink-db.jar的Main class,其余的参数是本文job所用的kafka相关的参数。

    bin/flink run -c com.bigknow.flink.KafkaToDB examples/flink-db.jar \
    --topic my-topic \
     --bootstrap.servers 192.168.1.170:9092 \
    --zookeeper.connect 192.168.1.170:2181 \
    --group.id test01`
    

    (完)

    相关文章

      网友评论

          本文标题:Flink学习笔记(3):Sink to JDBC

          本文链接:https://www.haomeiwen.com/subject/leceyttx.html