由于不了解scala,所以今天差不多搞了一个下午加一个晚上,要记录下来,下次直接学代码了,不怕工程上的东东了。
一,flink的单节点运行
使用docker-compose.yml,注意要指定版本,如果scala版本不对的话,动不动就出错,我就遇到了很多少,不断的调系统级的scala版本,IDEA里的编译scala版本,flink编译出来的scala版本。最后,得出一个快速的方案,都用scala 2.11版本。操作系统,pom.xml依赖,flink编译时。比如,这个docker镜像,我用的就是flink:scala_2.11-java8版本。
镜像 到dockerhub上找tag

version: "2.1"
services:
jobmanager:
image: flink:scala_2.11-java8
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./root/flink:/demo
taskmanager:
image: flink:scala_2.11-java8
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
二, 项目结构和代码编译
启动IDEA,建好项目。IDEA项目建立时,参数如下:


目录结构如下:

POM.XML内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.bbk</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<!-- flink与kafka集成需要导入的jar包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- 导入flink与hbase整合jar包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<!-- 暂时没有1.8.1这个版本 -->
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.14.2</version>
</dependency>
<!-- flink table API需要导入的jar包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.1</version>
</dependency>
<!-- flink与json进行整合需要的jar包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 限制jdk版本插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 编译scala需要用到的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 项目打包用到的插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Demo.scala内容如下
package org.bbk.flink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
case class CountWord(word: String, count: Long)
object Demo {
def main(args:Array[String]):Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val result: DataStream[String] = environment.socketTextStream("192.168.1.111", 9000)
import org.apache.flink.api.scala._
val resultValue: DataStream[CountWord] = result.flatMap(x => x.split(" ")).map(x => CountWord(x, 1)).keyBy("word").sum("count")
resultValue.print().setParallelism(1)
environment.execute()
}
}
代码编译
mvn clean scala:compile compile package
会生产flink-1.0-SNAPSHOT-jar-with-dependencies.jar这样一个文件。
将这个文件,想办法CP进镜像时备用。如何上传和CP进容器,这就不说细说了吧。
三, 在类生产的flink中运行jar包
在一个terminal中运行一个nc程序,模拟实时数据输入。
[root@127 ~]# nc -lk 9000
ssss
df
sd
hjjh
ytyu
dddd
this
2222
this
this
2222
df
df
df
df
df
运行刚才生成的jar包,提交到flink节点中
root@7fc97df9c324:/demo# flink run --class org.bbk.flink.Demo flink-1.0-SNAPSHOT-jar-with-dependencies.jar
Job has been submitted with JobID 06022ffb1c4ef7b191b34186340c9754
看看flink的输出吧
taskmanager_1 | 2022-05-05 13:00:12,233 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
jobmanager_1 | 2022-05-05 13:00:12,241 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from DEPLOYING to INITIALIZING.
jobmanager_1 | 2022-05-05 13:00:12,243 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
taskmanager_1 | 2022-05-05 13:00:13,541 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1 | 2022-05-05 13:00:13,548 INFO org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction [] - Connecting to server socket 192.168.1.111:9000
jobmanager_1 | 2022-05-05 13:00:13,548 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1 | 2022-05-05 13:00:13,561 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
taskmanager_1 | 2022-05-05 13:00:13,569 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
taskmanager_1 | 2022-05-05 13:00:13,583 INFO org.apache.flink.runtime.taskmanager.Task [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1)#0 (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
jobmanager_1 | 2022-05-05 13:00:13,587 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
taskmanager_1 | CountWord(ytyu,1)
taskmanager_1 | CountWord(dddd,1)
taskmanager_1 | CountWord(this,1)
taskmanager_1 | CountWord(2222,1)
taskmanager_1 | CountWord(this,2)
taskmanager_1 | CountWord(this,3)
taskmanager_1 | CountWord(2222,2)
taskmanager_1 | CountWord(df,1)
taskmanager_1 | CountWord(df,2)
taskmanager_1 | CountWord(df,3)
taskmanager_1 | CountWord(df,4)
taskmanager_1 | CountWord(df,5)
看看flink web ui上的东东

网友评论