前置要求
- Java技术体系
- Junit单元测试
- Maven依赖管理使用
- Idea软件使用
- Flink自建版本custom-test
关于Flink版本自建可以参考上一篇文章
新建flink-test项目测试自建Flink
在 flink-in-depth 目录下新建一个 flink-test 目录,并添加 pom.xml 文件,添加自建Flink的custom-test版本依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<flink.version>custom-test</flink.version>
<scala.version>2.12.7</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<calcite.version>1.26.0</calcite.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependencies>
除了添加Flink依赖之外,我们还添加了junit单元测试、lombok快捷工具、scala语言包
下面就开始我们的Flink自建版本本地Local MiniCluster的单元测试吧
新建一个名为FlinkFrameworkTest的Java类,然后添加如下代码:
private static final StreamExecutionEnvironment streamExecutionEnvironment;
static {
Configuration configuration = new Configuration();
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
}
@Test
public void testFlinkHelloWorld() throws Exception {
DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080);
lines.print();
streamExecutionEnvironment.execute();
}
HelloWorld程序很简单,使用静态代码块创建Flink的流式执行环境StreamExecutionEnvironment,然后从本地的8080网络端口读取Socket链接数据并进行输出,最后是使用执行环境执行流式应用程序
其中执行环境我们通过Configuration类配置了只有一个并行度,一个TaskManager,每个TaskManager只有一个Slot
由于我们使用了带有Flink WebUI的流式执行环境,所以需要 flink-runtime-web 依赖
由以上的HelloWorld可以看出,Flink的流式应用编程三步曲
1) 创建流式执行环境StreamExecutionEnvironment
2) 基于StreamExecutionEnvironment执行环境进行DataStream的算子操作(包括Source、Transformation、Sink)
3) 使用StreamExecutionEnvironment执行环境执行程序
本例子中只有Source与Sink,其实Source与Sink操作本质也是Transaformation操作,但更多与外部系统或集合交互
接下来我们运行一下HelloWorld例子
先通过nc程序打开8080网络端口
nc -lk 8080
image
然后点击单元测试运行HelloWorld单元测试
image
可以看到我们的HelloWorld已经创建了本地Local MiniCluster集群并运行了起来
通过浏览器找开 http://localhost:8081 页面(8081端口是Flink WebUI的默认端口)
从Flink WebUI概览图上,我们可以看出目前集群的配置是只有一个TaskManager,只有一个Slot,与我们代码里Configuration的配置一致,同时只有一个Job在运行,就是我们的HelloWorld程序,这个程序使用了默认的名称,叫做Flink Streaming Job
我们点击 Flink Streaming Job 这个程序名称,就可以打开正在运行的HelloWorld的算子流图
image
从流图中我们可以看出目前有一个Souce为Socket Stream,一个Sink为Print to Std. Out,与我们的代码一致
我们通过nc程序输入数据看一下效果
image
我们输入什么控制台上就输出什么
以上就是我们使用自建Flink版本进行的HelloWorld流式程序测试
网友评论