使用IDEA作为开发工具
一、wordCount
1.1、创建maven项目
1.2、pom文件
<properties>
<flink.version>1.12.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- flink-streaming-java_2.11 : 2.11为scala版本,因为flink-runtime中用到了akka,akka是scala写的。
- flink-java:1.12.0
1.3 log4j.properties
log4j.rootLogger=WARN,console
log4j.additivity.org.apache=true
# (console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=Test
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n
log4j.appender.debug=org.apache.log4j.DailyRollingFileAppender
log4j.appender.debug.layout=org.apache.log4j.PatternLayout
log4j.appender.debug.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %p [%t] %C.%M(%L) | %m%n
log4j.appender.debug.File=D:\\code\\dfjx\\cctv\\realtime\\RealTime\\test\\debug.txt
log4j.appender.debug.DatePattern='-'yyyy-MM-dd
log4j.appender.debug.Threshold=DEBUG
1.4 批处理wordCount程序
package com.dfjx.flinkTuorial.wordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//todo read data from files
DataSource<String> dataSource = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
Operator dataOperator = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
DataSet<String> dataSet = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
/**
* the relations for class DataSource, Operator, DataSet
* DataSource extends abstract Operator
* Operator extends abstract DataSet
*/
//todo string =>> (word,1)
AggregateOperator<Tuple2<String, Integer>> dataSourceSum = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}).groupBy(0).sum(1);
dataSourceSum.print("dataSourceSum: ");
AggregateOperator dataOperatorSum = dataOperator.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}).groupBy(0).sum(1);
dataOperatorSum.print("dataOperatorSum: ");
AggregateOperator<Tuple2<String, Integer>> dataSetSum = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}).groupBy(0).sum(1);
dataSetSum.print("dataSetSum: ");
env.execute();
}
}
批处理基于 DataSet 进行转换的,可以去看看源码,较为简单。
1.5 流式wordCount
package com.dfjx.flinkTuorial.wordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\java\\com\\dfjx\\flinkTuorial\\wordCount\\input\\hello.txt");
/**
* SingleOutputStreamOperator extend DataStream
* KeyedStream extend DataStream
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> dataStreamSourceSum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0).sum(1);
dataStreamSourceSum.print("dataStreamSourceSum: ");
env.execute();
}
}
使用java8特性写
package com.dfjx2.wordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* author : bigxiao
* date : 2021/8/20 0020
* Desc :
* modified by :
* version :
*/
public class StreamWordCount2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataSource<String> dataSource = batchEnv.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\resources\\hello.txt");
DataStreamSource<String> dataStreamSource = env.readTextFile("E:\\IdeaProjects\\flinkTutorial\\src\\main\\resources\\hello.txt");
// todo 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = dataStreamSource.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
}).returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// todo 分组
KeyedStream<Tuple2<String, Long>, String> keyedStream = wordAndOne.keyBy(t -> t.f0);
// todo 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyedStream.sum(1);
// todo 打印
sum.print();
// todo 提交
env.execute();
}
}
流式处理是基于DataStream 来继续计算转换的。
1.6 DataSet 和 DataStream的比较
DataSet DataStream
1.7 并行度/分区概念引入
红色部分的1,2,3,4表示什么?
我的wordCount程序是在本地跑的,本地电脑cpu为4核的,可以同时有4个进程同时进行,即4个并行度的概念。
wordCoun打印结果
1.8 从Socket读取数据输入的wordCount
使用Socket模拟无界流数据。
package com.dfjx.flinkTuorial.wordCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ResourceBundle;
public class SocketStreamWordCount {
static String hostname;
static int port;
static {
ResourceBundle bundle = ResourceBundle.getBundle("socket");
hostname = bundle.getString("hostname");
port = Integer.valueOf(bundle.getString("port"));
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> dataStreamSourceSum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0).sum(1);
dataStreamSourceSum.print();
env.execute();
}
}
网友评论