美文网首页
Apache Beam Pipeline设计.docx

Apache Beam Pipeline设计.docx

作者: HelloWide | 来源:发表于2019-03-28 15:18 被阅读0次

本文将帮助你理解如何设计pipeline。它包含关于如何确定pipeline结构、如何选择将哪些transform应用于数据以及如何确定输入和输出方法的信息。

1. 在设计管道时应该考虑什么?

1.1 数据存储在哪?

你有多少组输入数据?这将决定您需要在pipeline的开始应用哪些类型的读取转换。

1.2 数据是什么类型?

可能是文本数据,日志数据,数据库表数据,部分beam transform只针对KV数据有效,用户需要了解数据是如何输入的,以及如何在PCollection中最好地表示这些数据。

1.3 需要对数据做什么处理 ?

Beam Core Transform是通用的,了解如何更改或操作数据将决定如何构建像ParDo这样的核心转换,包括何时处理?

1.4 数据结果输出形式?

写入文本,数据库,或者数据流等。

2. 基础Pineline

最简单的管道-线性的操作流程:

1.png

但是实际的Pineline复杂的多,通常一个pineline代表一种DAG图。它可以有多个输入源、多个输出接收器,其操作(PTransforms)可以读取和输出多个pcollection。

3. PCollections分之处理

Transform并不会改变原有的PCollections,可以考虑PCollection的每个单独元素,并创建一个新的PCollection作为输出。这样,您可以对同一个PCollection中的不同元素执行不同的操作。

4. 多个Transform处理同一个PCollection

您可以使用相同的PCollection作为多个转换的输入,而无需使用或更改输入。管道从数据库表中读取输入(用字符串表示的名称)并创建表行的PCollection。然后,管道对同一个PCollection应用多个Transform。Transform A提取PCollection中以字母“A”开头的所有名称,Transform B提取PCollection中以字母“B”开头的所有名称。两个转换A和B都有相同的输入PCollection。


2.png

Example code:

PCollection<String> dbRowCollection = ...;

PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("A")){
      c.output(c.element());
    }
  }
}));

PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("B")){
      c.output(c.element());
    }
  }
}));

5 一个Transform多个输出

另一种将Pipeline分之处理的方法是使用标记输出(https://beam.apache.org/documentation/programming-guide/#additional-outputs)将单个Transform输出转换为多个PCollections。生成多个输出的Transform对输入的每个元素进行一次处理,并将输出处理为零或多个PCollections。

3.png

Example code:

// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
    dbRowCollection.apply(ParDo
        .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            if (c.element().startsWith("A")) {
              // Emit to main output, which is the output with tag startsWithATag.
              c.output(c.element());
            } else if(c.element().startsWith("B")) {
              // Emit to output with tag startsWithBTag.
              c.output(startsWithBTag, c.element());
            }
          }
        })
        // Specify main output. In this example, it is the output
        // with tag startsWithATag.
        .withOutputTags(startsWithATag,
        // Specify the output with tag startsWithBTag, as a TupleTagList.
                        TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

6 合并PCollections

你可以使用以下其中一种方法:
Flatten : 可以使用Beam sdk中的Flatten转换合并相同类型的多个PCollections。
Join : 可以使用Beam SDK中的CoGroupByKey转换在两个pcollection之间执行关系连接。PCollections必须是键控的(即它们必须是键/值对的集合),并且它们必须使用相同的键类型
图4中的示例是上面部分中图2中的示例的延续。在分支到两个PCollection(一个名称以“A”开头,另一个名称以“B”开头)之后,管道将这两个PCollection合并到一个PCollection中,该PCollection现在包含以“A”或“B”开头的所有名称。在这里,使用Flatten是有意义的,因为合并的PCollections都包含相同的类型。


4.png

Example:

//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

7 多数据源

管道可以从一个或多个源读取输入。如果您的管道从多个源读取数据,并且来自这些源的数据是相关的,那么将输入连接在一起是很有用的。在下面的图5所示的示例中,管道从数据库表中读取名称和地址,从Kafka主题中读取名称和订单号。然后管道使用CoGroupByKey连接该信息,其中的键是名称;结果PCollection包含名称、地址和订单的所有组合


5.png

Example:

PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);

PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);

final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
  KeyedPCollectionTuple.of(addressTag, userAddress)
                       .and(orderTag, userOrder)
                       .apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);

原文参考链接:https://beam.apache.org/documentation/pipelines/design-your-pipeline/
Beam API DOC: https://beam.apache.org/releases/javadoc/2.9.0/

相关文章

  • Apache Beam Pipeline设计.docx

    本文将帮助你理解如何设计pipeline。它包含关于如何确定pipeline结构、如何选择将哪些transform...

  • Apache Beam 处理文件

    今天我们介绍了如何使用pipeline在 Apache Beam 中的文件中读取、写入数据,其中“Employee...

  • Apache Beam

    Apache Beam基本架构 Apache Beam主要由Beam SDK和Beam Runner组成,Beam...

  • 数据处理的内容、地点、时间和方式

    为了让您了解实际情况,我使用Apache Beam代码片段,并结合延时图来提供可视化的表示。Apache Beam...

  • Apache Beam介绍

    Apache Beam提供了统一的大数据编程抽象,提供了不同的执行引擎支持,比如Spark/Flink/Storm...

  • Apache Beam SQL

    Beam不仅支持java,python还支持SQL分析,非常类似于Spark SQL;Beam SQL 现在只支持...

  • 让Apache Beam在GCP Cloud Dataflow上

    简介 在文章《Apache Beam入门及Java SDK开发初体验[https://www.pkslow.com...

  • Apache Beam入门学习一

    一、Beam编程基本概念 PCollection:数据集,可能是有界数据集(数据量有限)和无界数据集(数据量无限)...

  • apache beam 简介和安装

    1.Apache beam 是google和其合作伙伴开源的新的流式大数据分析模式,目前支持如下的引擎: 2.执行...

  • Apache Beam研究报告

    概述 本文不是一篇Beam的入门文档,不会介绍Beam的基本概念;而会主要探讨Beam的表达力,Beam的性能,以...

网友评论

      本文标题:Apache Beam Pipeline设计.docx

      本文链接:https://www.haomeiwen.com/subject/uwwqbqtx.html