flink socketTextStream window nc
测试工程POM.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
在windows cmd使用如下命令准备连接 9000端口发送数据
nc -lp 9000 -v
flink demo
package com.keke.test0827;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount2 {
public static void main(String[] args) throws Exception{
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
// env.disableOperatorChaining();
// // 从文件中读取数据
// String inputPath = "hello.txt";
// DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream("127.0.0.1", 9000);
// DataStreamSource<String> inputDataStream = env.socketTextStream("127.0.0.1", 9000);
// 基于数据流进行转换计算
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
.keyBy(0)
// .sum(1).setParallelism(2).slotSharingGroup("red");
// .sum(1);
resultStream.print();
// 执行任务
env.execute();
}
// 自定义类,实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按空格分词
String[] words = value.split(" ");
// 遍历所有word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
如上例子:
1 .sum(1).setParallelism(2).slotSharingGroup("red");
2 .sum(1);
快速,因为只启动flink案例后如果连接不到9000端口(没有先 执行cmd nc -lp 9000 -v 命令)程序将很快终止 使用 windows cmd中命令 netstat -ano|findstr "9000" 查看 9000端口的启动情况
如果选择1 位置代码, 无结果显示。
如果选择2 位置代码, 结果显示:
TCP 0.0.0.0:9000 0.0.0.0:0 LISTENING 4612
网友评论