本文将帮助你理解如何设计pipeline。它包含关于如何确定pipeline结构、如何选择将哪些transform应用于数据以及如何确定输入和输出方法的信息。
1. 在设计管道时应该考虑什么?
1.1 数据存储在哪?
你有多少组输入数据?这将决定您需要在pipeline的开始应用哪些类型的读取转换。
1.2 数据是什么类型?
可能是文本数据,日志数据,数据库表数据,部分beam transform只针对KV数据有效,用户需要了解数据是如何输入的,以及如何在PCollection中最好地表示这些数据。
1.3 需要对数据做什么处理 ?
Beam Core Transform是通用的,了解如何更改或操作数据将决定如何构建像ParDo这样的核心转换,包括何时处理?
1.4 数据结果输出形式?
写入文本,数据库,或者数据流等。
2. 基础Pineline
最简单的管道-线性的操作流程:
1.png但是实际的Pineline复杂的多,通常一个pineline代表一种DAG图。它可以有多个输入源、多个输出接收器,其操作(PTransforms)可以读取和输出多个pcollection。
3. PCollections分之处理
Transform并不会改变原有的PCollections,可以考虑PCollection的每个单独元素,并创建一个新的PCollection作为输出。这样,您可以对同一个PCollection中的不同元素执行不同的操作。
4. 多个Transform处理同一个PCollection
您可以使用相同的PCollection作为多个转换的输入,而无需使用或更改输入。管道从数据库表中读取输入(用字符串表示的名称)并创建表行的PCollection。然后,管道对同一个PCollection应用多个Transform。Transform A提取PCollection中以字母“A”开头的所有名称,Transform B提取PCollection中以字母“B”开头的所有名称。两个转换A和B都有相同的输入PCollection。
2.png
Example code:
PCollection<String> dbRowCollection = ...;
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){
c.output(c.element());
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){
c.output(c.element());
}
}
}));
5 一个Transform多个输出
另一种将Pipeline分之处理的方法是使用标记输出(https://beam.apache.org/documentation/programming-guide/#additional-outputs)将单个Transform输出转换为多个PCollections。生成多个输出的Transform对输入的每个元素进行一次处理,并将输出处理为零或多个PCollections。
Example code:
// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// Emit to main output, which is the output with tag startsWithATag.
c.output(c.element());
} else if(c.element().startsWith("B")) {
// Emit to output with tag startsWithBTag.
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
6 合并PCollections
你可以使用以下其中一种方法:
Flatten : 可以使用Beam sdk中的Flatten转换合并相同类型的多个PCollections。
Join : 可以使用Beam SDK中的CoGroupByKey转换在两个pcollection之间执行关系连接。PCollections必须是键控的(即它们必须是键/值对的集合),并且它们必须使用相同的键类型
图4中的示例是上面部分中图2中的示例的延续。在分支到两个PCollection(一个名称以“A”开头,另一个名称以“B”开头)之后,管道将这两个PCollection合并到一个PCollection中,该PCollection现在包含以“A”或“B”开头的所有名称。在这里,使用Flatten是有意义的,因为合并的PCollections都包含相同的类型。
4.png
Example:
//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);
7 多数据源
管道可以从一个或多个源读取输入。如果您的管道从多个源读取数据,并且来自这些源的数据是相关的,那么将输入连接在一起是很有用的。在下面的图5所示的示例中,管道从数据库表中读取名称和地址,从Kafka主题中读取名称和订单号。然后管道使用CoGroupByKey连接该信息,其中的键是名称;结果PCollection包含名称、地址和订单的所有组合
5.png
Example:
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
原文参考链接:https://beam.apache.org/documentation/pipelines/design-your-pipeline/
Beam API DOC: https://beam.apache.org/releases/javadoc/2.9.0/
网友评论