批与流
- 批:
批处理的特点是有界
、大量
,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。 - 流:
流处理的特点是无界
、实时
, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
有界流与无界流
- 有界流:
有界数据流有明确定义的
开始和结束
,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。
- 常见的有界流
- 读取文件
- 无界流:
无界数据流
有一个开始但是没有结束
,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
- 常见的无界流
- 读取
socket
数据 - 读取
Kafka
数据 - 读取
flume
数据
创建maven项目
- 导入pom.xml 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 核心
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
- 日志
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
准备 wordcount.txt
java python hello
pon xml log batch
python log java word
count xml python hello
exe txt log xml pon java
批处理
- 程序
package com.admin.flink.demo01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Arrays;
/**
* 批处理
* @author admin
* @date 2021/8/6
*/
public class BatchStream {
public static void main(String[] args) throws Exception {
// 获取环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// 读取资源
DataSource<String> dataSource = executionEnvironment.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
// 分隔,扁平化(s, 1)
FlatMapOperator<String, Tuple2<String, Integer>> flatMap = dataSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切割 遍历 收集
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
// 当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
}).returns(Types.TUPLE(Types.STRING,Types.INT));
// 按照单词分组并求算结果
AggregateOperator<Tuple2<String, Integer>> sum = flatMap.groupBy(0).sum(1);
// 打印输出
sum.print();
}
}
- 结果
(pon,2)
(hello,2)
(log,3)
(xml,3)
(exe,1)
(java,3)
(python,3)
(txt,1)
(batch,1)
(count,1)
(word,1)
有界流
- 程序
package com.admin.flink.demo01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* 有界流
* @author admin
* @date 2021/8/6
*/
public class BoundedStream {
public static void main(String[] args) throws Exception {
// 创建有界流环境
StreamExecutionEnvironment boundedStream = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取资源
DataStreamSource<String> streamSource = boundedStream.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
// 数据扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切分,遍历 收集
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
// 按key 收集,聚合求出总数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(0).sum(1);
//输出
sum.print("test");
// 开启
boundedStream.execute();
}
}
- 结果
test:14> (batch,1)
test:12> (word,1)
test:3> (java,1)
test:14> (count,1)
test:13> (xml,1)
test:5> (python,1)
test:1> (pon,1)
test:3> (java,2)
test:5> (hello,1)
test:1> (pon,2)
test:3> (java,3)
test:13> (xml,2)
test:11> (log,1)
test:5> (python,2)
test:5> (python,3)
test:7> (txt,1)
test:11> (log,2)
test:5> (hello,2)
test:11> (exe,1)
test:11> (log,3)
test:13> (xml,3)
无界流
- 程序
package com.atguigu.flink.demo01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
*
* 无界流
* @author admin
* @date 2021/8/6
*/
public class UnboundedStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切割每行数据,并收集到 Collector中
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).sum(1);
sum.print("test");
env.execute();
}
}
- 使用 nc 往 9999 端口发送数据
[admin@hadoop102 flink]$ nc -lk 9999
java hello
hello java python
hello java scala
- 结果打印
test:3> (java,1)
test:5> (hello,1)
test:3> (java,2)
test:5> (hello,2)
test:5> (python,1)
test:3> (java,3)
test:1> (scala,1)
test:5> (hello,3)
- 常见的错误
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(BoundedStream.java:27)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:193)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:319)
at com.atguigu.flink.demo01.BoundedStream.main(BoundedStream.java:33)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:612)
at com.atguigu.flink.demo01.BoundedStream.main(BoundedStream.java:27)
- 原因:
当Lambda表达式使用 java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
- 解决方式:使用
.returns
指定泛型类型
// 数据扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切分,遍历 收集
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
网友评论