美文网首页玩转大数据
Flink 使用之数据源

Flink 使用之数据源

作者: AlienPaul | 来源:发表于2019-12-19 10:09 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    Flink内置数据源

    Text file

    读取磁盘或者HDFS中的文件作为数据源。
    唯一的参数file path可以指定:

    • file:///path/to/file.txt
    • hdfs:///path/to/file.txt

    注意:

    1. 如果不填写前缀file://或者hdfs://,默认为file://
    2. 使用Flink读取HDFS文件系统,需要去官网下载对应Pre-bundled Hadoop包。这里给出的链接是适用于Hadoop 2.8.3。之后将这个jar复制到flink安装位置的lib目录中。
    val stream = env.readTextFile("/path/to/file.txt")
    

    socketTextStream

    使用socket作为数据源。但不推荐socket在生产环境中作为数据源。原因如下:

    • socket无状态,也不能replay。故无法保证数据精准投送。
    • socket数据源并行度只能是1,无法很好利用并发处理性能。

    SocketTextStream适合用于debug或者是测试用途。

    val stream = env.socketTextStream("localhost", 9000)
    

    fromElements

    将一系列元素作为数据源。

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromElements(1, 2, 3);
    

    fromCollection

    和fromElements方法类似,不同的是该方法接收一个集合对象,而不是可变参数。如下所示:

    val stream = env.fromCollection(Array(1, 2, 3))
    

    Kafka 数据源

    该数据源用于接收Kafka的数据。
    使用Kafka数据源之前需要先确定Kafka的版本,引入对应的Kafka Connector以来。对应关系如下所示。

    Kafka 版本 Maven 依赖
    0.8.x flink-connector-kafka-0.8_2.11
    0.9.x flink-connector-kafka-0.9_2.11
    0.10.x flink-connector-kafka-0.10_2.11
    0.11.x flink-connector-kafka-0.11_2.11
    1.0 以上 flink-connector-kafka_2.11

    引入Maven依赖。以flink-connector-kafka_2.11为例,添加以下依赖到pom.xml文件:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>
    

    在集群中运行时,为了减少提交jar包的大小,需要将该依赖设置为provided。然后把此依赖包复制到Flink各个节点安装位置的lib目录中。

    一个简单的使用例子如下:

    // 设置Kafka属性
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.100.128:9092")
    properties.setProperty("group.id", "test")
    
    // 创建Kafka数据源,其中test为topic名称
    val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
    

    DeserializationSchema

    DeserializationSchema用于将接收到的二进制数据转换为Java或Scala对象。Kafka Connector提供了如下4种DeserializationSchema:

    • TypeInformationSerializationSchema:使用Flink的TypeInformation反序列化。如果上游数据也是通过Flink TypeInformation序列化后写入的,这里使用此schema最为合适。
    • JsonDeserializationSchema :将获取的数据转换为JSON格式。这里有一个坑,如果发送过来的数据不是合法的JSON格式,数据源会抛出异常导致TaskManager重启。如果需要对不合法的JSON数据容错,需要实现自定义的DeserializationSchema。
    • AvroDeserializationSchema:读取Avro格式的数据。
    • SimpleStringSchema:转换接收到的数据为字符串。

    自定义DeserializationSchema

    所有的Schema需要实现DeserializationSchema。该接口源码如下所示:

    @Public
    public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    
        /**
         * Deserializes the byte message.
         *
         * @param message The message, as a byte array.
         *
         * @return The deserialized message as an object (null if the message cannot be deserialized).
         */
        T deserialize(byte[] message) throws IOException;
    
        /**
         * Method to decide whether the element signals the end of the stream. If
         * true is returned the element won't be emitted.
         *
         * @param nextElement The element to test for the end-of-stream signal.
         * @return True, if the element signals end of stream, false otherwise.
         */
        boolean isEndOfStream(T nextElement);
    }
    

    方法解释:

    • deserialize:将二进制消息转换为某类型消息。
    • isEndOfStream:表示是否是最后一条数据。

    以SimpleStringSchema为例展示下怎么编写自定义的DeserializationSchema。
    相关代码如下:

    @PublicEvolving
    public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
        // SerializationSchema接口的方法省略
        @Override
        public String deserialize(byte[] message) {
            return new String(message, charset);
        }
    
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        // ...
    }
    

    起始位置属性配置

    使用示例:

    myConsumer.setStartFromEarliest()      // start from the earliest record possible
    myConsumer.setStartFromLatest()        // start from the latest record
    myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
    myConsumer.setStartFromGroupOffsets()  // the default behaviour
    

    方法解释:

    • setStartFromEarliest:从最早儿元素开始消费
    • setStartFromLatest:从最近的元素开始消费
    • setStartFromTimestamp:从指定时间戳的数据开始消费
    • setStartFromGroupOffsets:这是默认的配置。从消费组的offset开始消费。必须配置group.id配置项。

    Topic和分区感知

    Topic感知

    可以使用如下构造函数创建FlinkKafkaConsumer:

    FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) 
    

    和指定topic名称不同的是,这里传入的是一个正则表达式。所有名称匹配该正则表达式的topic都会被订阅。如果配置了分区感知(配置flink.partition-discovery.interval-millis为非负数),Job启动之后kafka新创建的topic如果匹配该正则,也会被订阅到。

    分区感知

    在Job运行过程中如果kafka新创建了partition,Flink可以动态感知到,然后对其中数据进行消费。整个过程仍然可以保证exactly once语义。

    默认情况分区感知是禁用的。如果要开启分区感知,可以设置flink.partition-discovery.interval-millis,即分区感知触发时间间隔。

    实现自定义数据源

    自定义数据源需要实现Flink提供的SourceFunction接口。

    SourceFunction接口的定义如下:

    @Public
    public interface SourceFunction<T> extends Function, Serializable {
        void run(SourceContext<T> ctx) throws Exception;
        void cancel();
    }
    

    run方法

    run方法为数据源向下游发送数据的主要逻辑。编写套路为:

    • 不断调用循环发送数据。
    • 使用一个状态变量控制循环的执行。当cancel方法执行后必须能够跳出循环,停止发送数据。
    • 使用SourceContext的collect等方法将元素发送至下游。
    • 如果使用Checkpoint,在SourceContext collect数据的时候必须加锁。防止checkpoint操作和发送数据操作同时进行。

    cancel方法:

    cancel方法在数据源停止的时候调用。cancel方法必须能够控制run方法中的循环,停止循环的运行。并做一些状态清理操作。

    SourceContext类

    SourceContext在SourceFunction中使用,用于向下游发送数据,或者是发送watermark。
    SourceContext的方法包括:

    • collect:向下游发送数据。有如下三种情况:
      • 如果使用ProcessingTime,该元素不携带timestamp。
      • 如果使用IngestionTime,元素使用系统当前时间作为timestamp。
      • 如果使用EventTime,元素不携带timestamp。需要在数据流后续为元素指定timestamp(assignTimestampAndWatermark)。
    • collectWithTimestamp:向下游发送带有timestamp的数据。和collect方法一样也有如下三种情况:
      • 如果使用ProcessingTime,timestamp会被忽略
      • 如果使用IngestionTime,使用系统时间覆盖timestamp
      • 如果使用EventTime,使用指定的timestamp
    • emitWatermark:向下游发送watermark。watermark也包含一个timestamp。向下游发送watermark意味着所有在watermark的timestamp之前的数据已经到齐。如果在watermark之后,收到了timestamp比该watermark的timestamp小的元素,该元素会被认为迟到,将会被系统忽略,或者进入到旁路输出(side output)。
    • markAsTemporarilyIdle:标记此数据源暂时闲置。该数据源暂时不会发送任何数据和watermark。仅对IngestionTime和EventTime生效。下游任务前移watermark的时候将不会再等待被标记为闲置的数据源的watermark。

    CheckpointedFunction

    如果数据源需要保存状态,那么就需要实现CheckpointedFunction中的相关方法。
    CheckpointedFunction包含如下方法:

    • snapshotState:保存checkpoint的时候调用。需要在此方法中编写状态保存逻辑
    • initializeState:在数据源创建或者是从checkpoint恢复的时候调用。此方法包含数据源的状态恢复逻辑。

    样例

    Flink官方给出的样板Source。这个数据源会发送0-999到下游系统。代码如下所示:

    public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
        private long count = 0L;
        // 使用一个volatile类型变量控制run方法内循环的运行
        private volatile boolean isRunning = true;
    
        // 保存数据源状态的变量
        private transient ListState<Long> checkpointedCount;
    
        public void run(SourceContext<T> ctx) {
            while (isRunning && count < 1000) {
                // this synchronized block ensures that state checkpointing,
                // internal state updates and emission of elements are an atomic operation
                // 此处必须要加锁,防止在checkpoint过程中,仍然发送数据
                synchronized (ctx.getCheckpointLock()) {
                    ctx.collect(count);
                    count++;
                }
            }
        }
    
        public void cancel() {
            // 设置isRunning为false,终止run方法内循环的运行
            isRunning = false;
        }
    
        public void initializeState(FunctionInitializationContext context) {
            // 获取存储状态
            this.checkpointedCount = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<>("count", Long.class));
    
            // 如果数据源是从失败中恢复,则读取count的值,恢复数据源count状态
            if (context.isRestored()) {
                for (Long count : this.checkpointedCount.get()) {
                    this.count = count;
                }
            }
        }
    
        public void snapshotState(FunctionSnapshotContext context) {
            // 保存数据到状态变量
            this.checkpointedCount.clear();
            this.checkpointedCount.add(count);
        }
    }
    

    相关文章

      网友评论

        本文标题:Flink 使用之数据源

        本文链接:https://www.haomeiwen.com/subject/upmmvftx.html