1 Join-library
join -library提供了内部连接、外部左连接和外部右连接函数。其目的是简化连接到简单函数调用的最常见情况。
这些函数是泛型的,支持任何bean支持的类型的连接。连接函数的输入是键/值的PCollections。左右PCollections都需要相同类型的键。所有连接函数都返回键/值,其中键为连接键,值为键/值,其中键为左值,右为值。类似于 SQL 中的a join b on …(key)。
注:
对于外部连接,用户必须提供一个表示null的值,因为null不能序列化。
Example:
PCollection<KV<String, String>> leftPcollection = ...
PCollection<KV<String, Long>> rightPcollection = ...
PCollection<KV<String, KV<String, Long>>> joinedPcollection = Join.innerJoin(leftPcollection,rightPcollection);
目前在2.10.0 release中支持:
1.png
2 Sorter
模块提供SortValues算子,对输入的PCollection<KV<K, Iterable<KV<K2, V>>>> 返回一个对于每个主键K,匹配一个按照副键(K2)的字节编码进行排序的集合,PCollection<KV<K, Iterable<KV<K2, V>>>>。它是一种高效的、可伸缩的迭代排序器,即使它们很大(不适合内存)。
说明:
- 这个转换只执行值排序;每个键所附带的iterable是排序的,但是不同键之间没有关系,因为Beam不支持PCollection中不同元素之间定义的任何关系;
- 每一个集合排序都通过使用本地内存和磁盘的任务完成。这意味着SortValues在不同管道中使用时可能会存在性能和/或可伸缩性的瓶颈。因此,不鼓励用户在单个元素的PCollection上使用SortValues对大型PCollection进行全局排序。(粗略)估计分类溢出到磁盘时使用的磁盘空间字节数公式如下:
numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3
Options
如果排序导致内存耗尽或溢出,用户可以通过创建一个自定义的临时位置。在调用SortValues.create()时候可以传入一个自定义的实例BufferedExternalSorter.Options。
Example:
PCollection<KV<String, KV<String, Integer>>> input = ...
// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
input.apply(GroupByKey.<String, KV<String, Integer>>create());
// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(
SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));
原文参考链接:https://beam.apache.org/documentation/sdks/java-extensions/
Beam API DOC: https://beam.apache.org/releases/javadoc/2.9.0/
Apache Beam: 一个高级且统一的编程模型
让批处理和流数据处理的作业在任何执行引擎上都可以运行.
网友评论