美文网首页
Flink DataStream API 介绍与使用

Flink DataStream API 介绍与使用

作者: OzanShareing | 来源:发表于2019-12-30 15:07 被阅读0次

    引文


    Flink 1.7 官方详细参考API

    正文


    DataStream编程模型

    Flink整个系统架构中,对流计算的支持是其最重要的功能之一,Flink基于Google提出的DataFlow模型,实现了支持原生数据流处理的计算引擎。

    Flink中定义了DataStream API让用户灵活且高效地编写Flink流式应用。

    DataStream API主要可分为三个部分,DataSource模块、Transformation模块以及DataSink模块。

    • 其中Sources模块主要定义了数据接入功能,主要是将各种外部数据接入至Flink系统中,并将接入数据转换成对应的DataStream数据集。

    • Transformation模块定义了对DataStream数据集的各种转换操作,例如进行mapfilterwindows等操作。

    • 最后,将结果数据通过DataSink模块写出到外部存储介质中,例如将数据输出到文件或Kafka消息中间件等。

    1. DataSources数据输入

    DataSources模块定义了DataStream API中的数据输入操作,Flink将数据源主要分为内置数据源第三方数据源这两种类型。

    其中内置数据源包含文件Socket网络端口以及集合类型数据,其不需要引入其他依赖库,且在Flink系统内部已经实现,用户可以直接调用相关方法使用。

    第三方数据源定义了Flink和外部系统数据交互的逻辑,包括数据的读写接口。在Flink中定义了非常丰富的第三方数据源连接器(Connector),例如Apache kafka ConnectorElatic Search Connector等。同时用户也可以自定义实现Flink中数据接入函数SourceFunction,并封装成第三方数据源的Connector,完成Flink与其他外部系统的数据交互。

    这里我们着重讲下外部数据源:
    a. 数据源连接器

    前面提到的内置数据源类型都是一些基本的数据接入方式,例如从文件、Socket端口中接入数据,其实质是实现了不同的SourceFunctionFlink将其封装成高级API,减少了用户的使用成本。

    对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此Flink通过实现SourceFunction定义了非常丰富的第三方数据连接器,基本覆盖了大部分的高性能存储介质以及中间件等。

    • 其中部分连接器是仅支持读取数据,例如Twitter Streaming APINetty等;
    • 另外一部分仅支持数据输出(Sink),不支持数据输入(Source),例如Apache CassandraElasticsearchHadoop FileSystem等。
    • 还有一部分是既支持数据输入,也支持数据输出,例如Apache KafkaAmazon KinesisRabbitMQ等连接器。

    Kafka为例,用户在Maven编译环境中导入下面代码清单所示的环境配置,主要因为Flink为了尽可能降低用户在使用Flink进行应用开发时的依赖复杂度,所有第三方连接器依赖配置放置在Flink基本依赖库以外,用户在使用过程中,根据需要将需要用到的Connector依赖库引入到应用工程中即可。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <flink.version>1.7.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <apache.hadoop.version>2.8.5</apache.hadoop.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    

    Schema参数的主要作用是根据事先定义好的Schema信息将数据序列化成该Schema定义的数据类型,默认是SimpleStringSchema,代表从Kafka中接入的数据将转换成String字符串类型处理。

    用户通过自定义Schema将接入数据转换成指定数据结构,主要是实现Deserialization-Schema接口来完成.

    b.自定义数据源连接器

    Flink中已经实现了大多数主流的数据源连接器,但需要注意,Flink的整体架构非常开放,用户也可以自己定义连接器,以满足不同的数据源的接入需求。可以通过实现SourceFunction定义单个线程的接入的数据接入器,也可以通过实现ParallelSource-Function接口或继承RichParallelSourceFunction类定义并发数据源接入器。

    DataSoures定义完成后,可以通过使用SteamExecutionEnvironmentaddSources方法添加数据源,这样就可以将外部系统中的数据转换成DataStream[T]数据集合,其中T类型是Source-Function返回值类型,然后就可以完成各种流式数据的转换操作。

    2. DataSteam转换操作

    即通过从一个或多个DataStream生成新的DataStream的过程被称为Transformation操作。

    在转换过程中,每种操作类型被定义为不同的OperatorFlink程序能够将多个Transformation组成一个DataFlow的拓扑。

    所有DataStream的转换操作可分为Single-DataStreamMulti-DataStream物理分区三类类型。其中Single-DataStream操作定义了对单个DataStream数据集元素的处理逻辑,Multi-DataStream操作定义了对多个DataStream数据集元素的处理逻辑。物理分区定义了对数据集中的并行度数据分区调整转换的处理逻辑。

    2.1 Single-DataStream操作
    (1)Map [DataStream->DataStream]

    调用用户定义的MapFunctionDataStream[T]数据进行处理,形成新的Data-Stream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。例如将输入数据集中的每个数值全部加1处理,并且将数据输出到下游数据集。

    上图中计算逻辑实现代码如下,通过从集合中创建DataStream,并调用DataStreammap方法传入计算表达式,完成对每个字段加1操作,最后得到新的数据集mapStream

    DataStream<Map<String, Object>> esDataStr = kafkaStream.map(
            new MapFunction<String, Map<String, Object>>() {
                private static final long serialVersionUID = 4987316772103776340L;
    
                @Override
                public Map<String, Object> map(String jsonStr) throws Exception {
                    try {
                        return CompEngineProcess.processMessage(jsonStr);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        return new HashMap<>();
                    }
                }
            }
    );
    
    (2)FlatMap [DataStream->DataStream]

    算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在经典例子WordCount中,将每一行的文本数据切割,生成单词序列。如在下图中对于输入DataStream[String]通过FlatMap函数进行处理,字符串数字按逗号切割,然后形成新的整数数据集。

    针对上述计算逻辑实现代码如下所示,通过调用resultStream接口中flatMap方法将定义好的FlatMapFunction传入,生成新的数据集。

    DataStream<Tuple2<RoutingInfo, Map<String, Object>>> routeStream = filterStream.flatMap(new FlatMapFunction<Map<String, Object>, Tuple2<RoutingInfo, Map<String, Object>>>() {
        private static final long serialVersionUID = 1L;
    
        @Override
        public void flatMap(Map<String, Object> inDataMap,
                            Collector<Tuple2<RoutingInfo, Map<String, Object>>> outCollector) {
            List<RoutingInfo> routingList = RulesManager.getInstance().getRoutingList(inDataMap.get(CodeString.TRANS_HEADER_FIELD_TRCD).toString());
            if (routingList != null) {
                routingList.forEach(data -> outCollector.collect(new Tuple2<>(data, inDataMap)));
            }
        }
    });
    
    (3)Filter [DataStream->DataStream]

    该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过滤掉。如下图所示将输入数据集中偶数过滤出来,奇数从数据集中去除。

    针对上图中的计算逻辑代码实现如下

    DataStream<Tuple2<RoutingInfo, Map<String, Object>>> filterStream = routeStream.filter(k -> isNull(k.f0));
    
    

    相关文章

      网友评论

          本文标题:Flink DataStream API 介绍与使用

          本文链接:https://www.haomeiwen.com/subject/kelmoctx.html