本文将带你执行你的第一个 Beam 管线 —— 运行一个由 Beam 的 Java SDK 编写的 WordCount 示例,于你选定一个的 runner 上。
Apache Beam 代言设置开发环境
- 下载并安装 Java Development Kit (JDK) 1.7 或更高版本。检查 JAVA_HOME 环境变量已经设置并指向你的 JDK 安装目录。
- 照着 Maven 的 安装指南 下载并安装适合你的操作系统的 Apache Maven 。
获取 WordCount 代码
获得一份 WordCount 管线代码拷贝最简单的方法,就是使用下列指令来生成一个简单的、包含基于 Beam 最新版的 WordCount 示例和构建的 Maven 项目:
$ mvn archetype:generate \
-DarchetypeRepository=https://repository.apache.org/content/groups/snapshots \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=LATEST \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
这将创建一个叫 word-count-beam
的目录,其中包含了一份简单的 pom.xml
文件和一套示例管线,用来计算某个文本文件中的各个单词的数量。
$ cd word-count-beam/
$ ls
pom.xml src
$ ls src/main/java/org/apache/beam/examples/
DebuggingWordCount.java WindowedWordCount.java common
MinimalWordCount.java WordCount.java
关于这些示例中用到的 Beam 的概念的详细介绍,请阅读 WordCount Example Walkthrough 一文。这里我们只聚焦于如何执行 WordCount.java
上。
运行 WordCount
一个单 Beam 管线可以运行于多种 Beam runner 上,包括 ApexRunner、FlinkRunner、SparkRunner 或 DataflowRunner 等。
在你选好用哪个 runner 以后:
- 确保你已经正确配置了该 runner 。
- 这样来构造你的命令行:
- 用
--runner=<runner>
选项指定你选定的 runner (缺省为 DirectRunner) - 添加特定于该 runner 的必需选项
- 选择该 runner 能访问到的输入文件和输出位置。(例如,当你在外部集群上运行管线的时候是无法访问本地文件的。)
- 运行你的第一个 WordCount 管线。
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--inputFile=pom.xml --output=counts --runner=ApexRunner" -Papex-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner
检视结果
一旦管线完成运行,你可以查看结果。你会注意到有多个以 count
打头的输出文件。具体会有几个这样的文件是由 runner 决定的。这样能方便 runner 进行高效的分布式执行。
$ ls counts*
$ ls counts*
$ ls counts*
$ ls /tmp/counts*
$ ls counts*
$ gsutil ls gs://<your-gcs-bucket>/counts*
当你查看文件内容的时候,你会看到里面包含每个单词的出现数量。文件中的元素顺序可能会和这里看到的不同。因为 Beam 模型通常并不保障顺序,以便于 runner 优化效率。
$ more counts*
api: 9
bundled: 1
old: 4
Apache: 2
The: 1
limitations: 1
Foundation: 1
...
$ cat counts*
BEAM: 1
have: 1
simple: 1
skip: 4
PAssert: 1
...
$ more counts*
The: 1
api: 9
old: 4
Apache: 2
limitations: 1
bundled: 1
Foundation: 1
...
$ more /tmp/counts*
The: 1
api: 9
old: 4
Apache: 2
limitations: 1
bundled: 1
Foundation: 1
...
$ more counts*
beam: 27
SF: 1
fat: 1
job: 1
limitations: 1
require: 1
of: 11
profile: 10
...
$ gsutil cat gs://<your-gcs-bucket>/counts*
feature: 15
smother'st: 1
revelry: 1
bashfulness: 1
Bashful: 1
Below: 2
deserves: 32
barrenly: 1
...
下一步
- 阅读 WordCount Example Walkthrough 学习更多的 WordCount 示例
- 深入学习我们推荐的 文章和幻灯片。
- 加入 Beam users@ 邮件列表。
如果你遇到任何问题,请千万不要犹豫 跟我们联系!
网友评论