环境配置
<properties>
<maven.compiler.source>14</maven.compiler.source>
<maven.compiler.target>14</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
批处理:DataSet API 实现一个word count案例
DataSet API将面临淘汰,flink流处理才是核心,怎么用流实现批,提交的时候加一个参数:BATCH模式
public class woodcut {
public static void main(String[] args) throws Exception {
/*创建执行环境*/
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = executionEnvironment.readTextFile("src/main/resources/a.txt");
FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = source.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
flatMapOperator.groupBy(0).sum(1).print();
}
}
流处理:有界流
public class bindedwordcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment ex= StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = ex.readTextFile("src/main/resources/a.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord = streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
/* flapWord.keyBy(0)*/
DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();
ex.execute();
}
}
流处理:无界流
模拟测试一下,需要安装 nc,实现端口的监听 netcat 1.11
public class unbindedwordcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord =
streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
/* flapWord.keyBy(0)*/
DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();
environment.execute();
}
}
从args中获取参数
使用flink提供的ParameterTool.fromArgs(args);
注意写法没有 : --host localhost --port 6666
public class unbindedwordcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool fromArgs = ParameterTool.fromArgs(args);
String host = fromArgs.get("host");
int port = fromArgs.getInt("port");
DataStreamSource<String> streamSource = environment.socketTextStream(host, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord =
streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
/* flapWord.keyBy(0)*/
DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();
environment.execute();
}
}
网友评论