目录
一、Mac安装Flink
1.brew 安装
brew install apache-flink
2.检查版本
flink --version
3.启动flink
/usr/local/Cellar/apache-flink/1.6.0/libexec/bin ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zhisheng.
Starting taskexecutor daemon on host zhisheng.
4.访问
http://localhost:8081/#/overview
![](https://img.haomeiwen.com/i1981528/d3dd74c6115096ce.png)
二、项目构建
项目构建可以采用两种方式实现
1.mvn archetype cmd
可以自定义groupID,artifactID
mvn archetype:generate -DarchetypeCatalog=internal -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2
Define value for property 'groupId': com.dangdang.flink
Define value for property 'artifactId': flink-helloworld
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.dangdang.flink: : com.flink.tutorial
Confirm properties configuration:
groupId: com.dangdang.flink
artifactId: flink-helloworld
version: 1.0-SNAPSHOT
package: com.flink.tutorial
Y: :
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: flink-quickstart-java:1.7.2
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dangdang.flink
[INFO] Parameter: artifactId, Value: flink-helloworld
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.flink.tutorial
[INFO] Parameter: packageInPathFormat, Value: com/flink/tutorial
[INFO] Parameter: package, Value: com.flink.tutorial
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: com.dangdang.flink
[INFO] Parameter: artifactId, Value: flink-helloworld
[WARNING] CP Don't override file /Users/qiankai07/flinklearn/flink-helloworld/src/main/resources
[INFO] Project created from Archetype in dir: /Users/qiankai07/flinklearn/flink-helloworld
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:03 min
[INFO] Finished at: 2020-02-15T16:53:55+08:00
[INFO] ------------------------------------------------------------------------
项目结构图
![](https://img.haomeiwen.com/i1981528/476e9a7f2e836ecd.png)
2.IDE add flink archetype
1)添加archetype,输入内容如右侧小图所示(1.7.2是flink版本,可视情况自定)
![](https://img.haomeiwen.com/i1981528/158276af64b105f5.png)
![](https://img.haomeiwen.com/i1981528/9291a01f473f51ed.png)
2)点击next输入GroupId,ArtifactId等
![](https://img.haomeiwen.com/i1981528/b20e9a40940651d9.png)
3)构建结果
三、项目说明
1、flink依赖
flink-java:批量计算API-DataSet API依赖库
flink-streaming-java:流式计算API-DataStream API依赖库
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
2、依赖scope
依赖Scope 都是 <scope>provided</scope>,原因:
1)项目编译好的jar包最终运行在flink web平台上
2)flink 集群中已经存在了相应jar包,设为scope避免版本冲突、jar包大小太大
四、基本API使用
1.批处理API-DataSet
2.流处理API-StreamSet
2.1 设置数据源
准备socket流式数据(通过nc命令行模拟),9000为socket端口
cmd: nc -l 9000
2.2 stream代码流程
1)设置环境-批or流处理
2)设置数据源-本地文件,socket流等
3)对获取到的数据做转化,比如计数,topN,平均值等统计处理
4)执行程序
public class StreamingJob {
public static void main(String[] args) throws Exception {
//1\. set up the streaming execution environment,是批处理还是流处理
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2\. 设置读取流数据源,本地文件,Socket均可
String host = "localhost";
int port = 9000;
DataStreamSource<String> stringDataStreamSource = env.socketTextStream(host, port);
//3\. 对流数据进行转化
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stringDataStreamSource.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
//打印出结果
sum.print();
//4\. execute program
env.execute("word count");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token: tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
2.3 运行代码
2.3.1 方式一:IDE dev直接运行(启动main方法)
1)首先,启动 nc -l 终端
2)再启动main 应用
3)终端输入数据集
3)观察console
![](https://img.haomeiwen.com/i1981528/951c3e96903ef74f.jpg)
2.3.2 方式二、flink命令行启动jar 包
1)执行mvn clean package -DskipTests 打成jar包
2)使用flink 命令行启动:flink目录 bin 目录下
terminal1:
nc -l 9000
terminal2:
flink run -c com.dangdang.flink.StreamingJob /Users/qiankai07/IdeaProjects/flink-learn/target/flink-learn-1.0-SNAPSHOT.jar localhost 9000
3)web UI 观察结果
![](https://img.haomeiwen.com/i1981528/f2a8228e4deb1d8a.jpg)
![](https://img.haomeiwen.com/i1981528/a27e9f4bf4e04a7c.jpg)
2.3.3 方式三:直接在web UI提交jar包及Job
1)上传Jar包
2)启动nc -l 9000
3)submit Job
![](https://img.haomeiwen.com/i1981528/7aa1bdfbd649a103.jpg)
4)观察running job
红框可以看出StreamingAPI的4个关键步骤
![](https://img.haomeiwen.com/i1981528/800e57c7b39cccfc.jpg)
5)nc -l 9000命令行输入数据
6)观察out文件: /usr/local/Cellar/apache-flink/1.10.0/libexec/log
]
![](https://img.haomeiwen.com/i1981528/735adead6a369dde.jpg)
7)停止Job-点击web UI 右上角-Cancel Job
五、项目运行方式
1.编译成jar包运行在flink web平台
2.命令号交互变成客户端-直接运行(只支持scalar版本)
网友评论