美文网首页
Flink(1.13) 快速上手

Flink(1.13) 快速上手

作者: 万事万物 | 来源:发表于2021-08-15 10:27 被阅读0次

    批与流

    • 批:
      批处理的特点是有界大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
    • 流:
      流处理的特点是无界实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

    在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
    而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

    有界流与无界流

    • 有界流:

    有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。

    • 常见的有界流
    1. 读取文件
    • 无界流:

    无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。

    • 常见的无界流
    1. 读取 socket 数据
    2. 读取 Kafka 数据
    3. 读取 flume 数据

    创建maven项目

    • 导入pom.xml 依赖
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>flink</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <flink.version>1.13.1</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <slf4j.version>1.7.30</slf4j.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-to-slf4j</artifactId>
                <version>2.14.0</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.4</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers combine.children="append">
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    • 核心
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    • 日志
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-to-slf4j</artifactId>
                <version>2.14.0</version>
                <scope>provided</scope>
            </dependency>
    

    log4j.properties

    log4j.rootLogger=error, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    

    准备 wordcount.txt

    java python hello
    pon xml log batch
    python log java word
    count xml python hello
    exe txt log xml pon java
    

    批处理

    • 程序
    package com.admin.flink.demo01;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.AggregateOperator;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    
    import java.util.Arrays;
    
    /**
     * 批处理
     * @author admin
     * @date 2021/8/6
     */
    public class BatchStream {
    
    
        public static void main(String[] args) throws Exception {
            // 获取环境
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            // 读取资源
            DataSource<String> dataSource = executionEnvironment.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
    
            // 分隔,扁平化(s, 1)
            FlatMapOperator<String, Tuple2<String, Integer>> flatMap = dataSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切割 遍历 收集
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
                // 当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
            }).returns(Types.TUPLE(Types.STRING,Types.INT));
    
            // 按照单词分组并求算结果
            AggregateOperator<Tuple2<String, Integer>> sum = flatMap.groupBy(0).sum(1);
            // 打印输出
            sum.print();
    
        }
    }
    
    • 结果
    (pon,2)
    (hello,2)
    (log,3)
    (xml,3)
    (exe,1)
    (java,3)
    (python,3)
    (txt,1)
    (batch,1)
    (count,1)
    (word,1)
    

    有界流

    • 程序
    package com.admin.flink.demo01;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Arrays;
    
    /**
     * 有界流
     * @author admin
     * @date 2021/8/6
     */
    public class BoundedStream {
    
        public static void main(String[] args) throws Exception {
            // 创建有界流环境
            StreamExecutionEnvironment boundedStream = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 读取资源
            DataStreamSource<String> streamSource = boundedStream.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
    
            // 数据扁平化
            SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切分,遍历 收集
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            }).returns(Types.TUPLE(Types.STRING, Types.INT));
    
            // 按key 收集,聚合求出总数
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(0).sum(1);
    
            //输出
            sum.print("test");
    
            // 开启
            boundedStream.execute();
        }
    }
    
    • 结果
    test:14> (batch,1)
    test:12> (word,1)
    test:3> (java,1)
    test:14> (count,1)
    test:13> (xml,1)
    test:5> (python,1)
    test:1> (pon,1)
    test:3> (java,2)
    test:5> (hello,1)
    test:1> (pon,2)
    test:3> (java,3)
    test:13> (xml,2)
    test:11> (log,1)
    test:5> (python,2)
    test:5> (python,3)
    test:7> (txt,1)
    test:11> (log,2)
    test:5> (hello,2)
    test:11> (exe,1)
    test:11> (log,3)
    test:13> (xml,3)
    

    无界流

    • 程序
    package com.atguigu.flink.demo01;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Arrays;
    
    /**
     *
     * 无界流
     * @author admin
     * @date 2021/8/6
     */
    public class UnboundedStream {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 监听端口
            DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
    
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切割每行数据,并收集到 Collector中
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            }).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).sum(1);
    
            sum.print("test");
    
            env.execute();
    
        }
    }
    
    • 使用 nc 往 9999 端口发送数据
    [admin@hadoop102 flink]$ nc -lk 9999
    java hello
    hello java python
    hello java scala
    
    • 结果打印
    test:3> (java,1)
    test:5> (hello,1)
    test:3> (java,2)
    test:5> (hello,2)
    test:5> (python,1)
    test:3> (java,3)
    test:1> (scala,1)
    test:5> (hello,3)
    
    • 常见的错误

    Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

    Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(BoundedStream.java:27)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
        at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:193)
        at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:319)
        at com.atguigu.flink.demo01.BoundedStream.main(BoundedStream.java:33)
    Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
        at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
        at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
        at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
        at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
        at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:612)
        at com.atguigu.flink.demo01.BoundedStream.main(BoundedStream.java:27)
    
    • 原因:

    当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息

    • 解决方式:使用.returns 指定泛型类型
    // 数据扁平化
            SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切分,遍历 收集
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            }).returns(Types.TUPLE(Types.STRING, Types.INT));
    

    相关文章

      网友评论

          本文标题:Flink(1.13) 快速上手

          本文链接:https://www.haomeiwen.com/subject/igwbvltx.html