Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。 Flink 已经可以在所有常见的集群环境中运行,并以 in-memory 的速度和任意的规模进行计算。
有状态
分布式
并行度
初始化程序入口:打开本地页面
WordCount.java
public class WordCount {
public static void main(String[] args) throws Exception {
//创建程序入口
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("hostname");
int port = parameterTool.getInt("port");
//数据的输入
DataStreamSource<String> myDataStream = env.socketTextStream(hostname, port);
//数据的处理
SingleOutputStreamOperator<WordAndOne> result = myDataStream.flatMap(new StringSplitTask()).keyBy("word")
.sum("count");
result.print();
env.execute("wordcount");
}
//封装了业务逻辑的算子
public static class StringSplitTask implements FlatMapFunction<String, WordAndOne> {
@Override
public void flatMap(String line, Collector<WordAndOne> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new WordAndOne(word, 1));
}
}
}
public static class WordAndOne {
private String word;
private Integer count;
@Override
public String toString() {
return "WordAndOne{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
public WordAndOne() {
}
public WordAndOne(String word, Integer count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
}
}
http://localhost:8081
设置并行度为2
//设置并行度
env.setParallelism(2);
在集群上运行
将注释解开
打包完成
上传到bigdata03
运行
cd /home/bigdata/apps/flink-1.10.1/
./bin/flink run -c com.WordCount /home/bigdata/data/FlinkCourse-1.0-SNAPSHOT.jar --hostname bigdata02 --port 1234
Flink分布式运行环境
TaskManager -> slot -> task -> 并行度
Flink数据传输的策略
task之间传输数据方式
task之间数据传递有四种方式
- forward strategy
- key-based strategy
- broadcast strategy
- random strategy
forward strategy
即转发策略: 一个task的输出只发送给一个task作为输入
优点
- 如果两个task都在一个JVM中的话,那么就可以避免网络开销
key-based strategy
即基于键值的策略
- 数据需要按照某个属性(我们称为 key)进行分组(或者说分区)
-
相同key的数据需要传输给同一个task,在一个task中进行处理
broadcast strategy
即广播策略
-
数据随机的从一个task中传输给下一个operator所有的subtask。因为这种策略涉及数据复制和网络通信,所以成本相当高。
random strategy
即随机策略
- 数据随机的从一个task中传输给下一个operator所有的subtask
-
保证数据能均匀的传输给所有的subtask,以便在任务之间均匀地分配负载
注意
转发与随机策略是基于key-based策略的;转发策略和随机策略也可以看作是基于键的策略的变体,其中前者保存上游元组的键,而后者执行键的随机重新分配
Task和Slot的关系
对接Kafka数据源
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers","bigdata02:9092");
consumerProperties.setProperty("group.id","test112_consumer");
FlinkKafkaConsumer010<String> myConsumer =
new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), consumerProperties);
设置: 10 个 task
- source 3
- flamap 2
- keyby sum 2
- map 2
- sink 1
WordCount.java
/**
*
* source 3
* flamap 2
* keyby sum 2
* map 2
* sink 1
*
* 10 task
*
*/
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topic="test112";
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers","bigdata02:9092");
consumerProperties.setProperty("group.id","testSlot_consumer");
FlinkKafkaConsumer010<String> myConsumer =
new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), consumerProperties);
//task
DataStreamSource<String> data = env.addSource(myConsumer).setParallelism(3);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordOneStream = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(Tuple2.of(word, 1));
}
}
}).setParallelism(2);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordOneStream.keyBy(0).sum(1).setParallelism(2);
result.map( tuple -> tuple.toString()).setParallelism(2)
.print().setParallelism(1);
//3 2 2 2 1 = 10Task任务
env.execute("WordCount2");
}
}
修改配置文件/home/bigdata/apps/flink-1.10.1/conf/flink-conf.yaml
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 2
启动
./bin/flink run -c com.slot.lesson03.WordCount /home/bigdata/data/FlinkCourse-1.0-SNAPSHOT.jar
页面显示只有8个task
Operator Chain的条件:
- 数据传输策略是 forward strategy
- 在同一个 TaskManager 中运行
红框标记部分合并成一个task
演示并行度为1的情况:
public class WordCount {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("hostname");
int port = parameterTool.getInt("port");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//在集群里面运行的时候,默认的并行度就是1
env.setParallelism(1);
DataStreamSource<String> data = env.socketTextStream(hostname, port);
SingleOutputStreamOperator<MyWordCount> result =
data.flatMap(new SplitWord())
.keyBy("word")
.sum("count");
result.print();
/**
* 这个任务里面应该有几个task?
* 1 1 1 1
* [socket flatMap] keybased [keyby|sum sink]
*
*
*/
env.execute("word count");
}
启动
./bin/flink run -c com.slot.lesson04.WordCount /home/bigdata/data/FlinkCourse-1.0-SNAPSHOT.jar --hostname bigdata02 --port 1234
页面显示只有2个task
演示并行度为7的情况:
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topic="test112";
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers","bigdata02:9092");
consumerProperties.setProperty("group.id","test112_consumer");
FlinkKafkaConsumer010<String> myConsumer =
new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), consumerProperties);
//task
DataStreamSource<String> data = env.addSource(myConsumer).setParallelism(3);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordOneStream = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(Tuple2.of(word, 1));
}
}
}).setParallelism(3);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordOneStream.keyBy(0).sum(1).setParallelism(3);
result.map( tuple -> tuple.toString()).setParallelism(3)
.print().setParallelism(1);
env.execute("WordCount....");
/**
* source 3
* flatMap 3
*
* keyby sum 3
* map 3
*
* sink 1
*
* 7 task
*
*/
}
}
启动
./bin/flink run -c com.slot.lesson05.WordCount /home/bigdata/data/FlinkCourse-1.0-SNAPSHOT.jar
页面显示只有7个task
Flink四层图结构
在写 Flink 代码的时候,实际上就是要构建一个 dataflow,当 Flink 程序从提交到运行,
这个 dataflow 需要经历如下 4 个阶段:
► Stream Graph
►Job Graph
► Execution Graph
► Physical Execution Graph
网友评论