本文作者:林伟兵,叩丁狼高级讲师。原创文章,转载请注明出处。
1. Flink简介
Flink 提供了三个核心的用户API:
- Batch
- Streaming
- Talbe & SQL
本文不介绍Flink是什么,Flink的核心组件和特性,本文从用户的角度解读Batch和Streaming代码的实现方式,本文使用Flink1.6.1版本作为讲解。首先需要下载官方的案例,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html.
2. 官方案例搭建
-
通过maven工具下载源码:
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.6.1
-
也可以使用官方提供的地址下载,但是不管如何 都必须保证有maven环境:
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.1
-
下载后的项目架构是这样的:
$ tree quickstart/ quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ └── StreamingJob.java └── resources └── log4j.properties
-
将源码导入IDE中。打开IDE,点击“open”,将项目导进去,此时需要加载对应的maven依赖。等待项目构建完毕后,删除BatchJob.java和StreamingJob.java代码,因为这个两个代码只是提供了一个模板,并没有实质性的代码。
-
打开github网站,找到flink项目的example代码,将batch和streaming代码粘贴到项目中。
02.png
03.png
粘贴后的代码如下,运行程序后打印出单词统计的结果:
04.png3. Batch代码解读:
如下贴出源代码,重要的代码部分参考注释:
public class WordCount {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
//创建执行环境,这里的环境可以是本地环境,也可以是集群环境.
// 用户可以调用createLocalEnvironment()运行本地环境 也可以调用createRemoteEnvironment()运行集群环境
//getExecutionEnvironment()可以动态获取环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ParameterTool就是对参数的一个封装类,这里将传递进来的参数作为全局参数
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text;
if (params.has("input")) {
//如果运行程序携带input参数,则从input参数后面获取文件地址,从而读取文件的内容
text = env.readTextFile(params.get("input"));
} else {
//如果不携带参数,则从WordCountData类读取每一行数据并进行分词统计
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
// 这里将一行数据转换成 (letter,1) (lette,1)(letter,1) (...)
text.flatMap(new Tokenizer())
// 将tuple(letter,1) 的第0个元素 进行groupby,再对相同的letter的count进行累加。
.groupBy(0)
.sum(1);
//上面程序得到的结果是一个队列,该队列由一个个的Tuple2<String, Integer>组成。
// emit result
if (params.has("output")) {
//如果有output参数,则将output提供的路径作为文件夹,将结果写入到该文件夹下
counts.writeAsCsv(params.get("output"), "\n", " ");
//开始执行程序
env.execute("WordCount Example");
} else {
//执行程序,将结果打印出来
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// Tuple2 就是将2个元素作为一个元组, 如果将3个元素作为一个元祖,则需要使用Tuple3
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
4. Streaming代码解读:
Streaming代码与batch代码并没有多大的不同。不同点参考注释:
public class WordCount {
public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// 不同于批量处理,实时流处理的环境代码前面添加了Stream标识。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// 这里的统计采用的是keyBy而非groupBy,因为实时流数据正常的逻辑是实时产生,它是无边数据
.keyBy(0).sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
//开始执行程序
env.execute("Streaming WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
总结:
- 不管是离线处理还是实时处理,都离不开三板斧,1-获取当前运行环境,2-通过算子进行数据分析,3-将结果输出到某一端。
- 算子包括了map() , flatMap(), filter(), keyBy( ), sum( ) ...
- 当前案例的批量处理的结果是将数据一次性输出,实时流数据是对每一行数据进行逐一的分析和输出。
- 除了Batch / Streaming 代码,官方还提供了 SQL 代码的案例。有兴趣的同学可以下载来试试。
想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html
网友评论