Beam不仅支持java,python还支持SQL分析,非常类似于Spark SQL;
Beam SQL 现在只支持 Java,底层是 Apache Calcite 的一个动态数据管理框架,用于大数据处理和一些流增强功能,它允许你自定义数据库功能。例如 Hive 使用了 Calcite 的查询优化,当然还有 Flink 解析和流 SQL 处理。Beam 在这之上添加了额外的扩展,以便轻松利用 Beam 的统一批处理 / 流模型以及对复杂数据类型的支持。
流程图
简化如下:
简化流程
- 给定一个PCollection和查询作为输入,首先将输入PCollection注册为模式存储库中的一个表;
- 根据语法对SQL查询进行解析,生成SQL抽象语法树;
- 验证表结构,并输出用关系代数表示的逻辑计划;
- 应用关系规则将逻辑计划转换为物理计划,表示为 Beam组件. 优化器是可选的,以更新计划;
- 最终, Beam 物理计划被编译为复合 PTransform;
Test Code
说明:所有案例都在源码中有;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.xxx.sqlbeam;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import javax.annotation.Nullable;
/**
* This is a quick example, which uses Beam SQL DSL to create a data pipeline.
*
* <p>Run the example from the Beam source root with
*
* <pre>
* ./gradlew :beam-sdks-java-extensions-sql:runBasicExample
* </pre>
*
* <p>The above command executes the example locally using direct runner. Running the pipeline in
* other runners require additional setup and are out of scope of the SQL examples. Please consult
* Beam documentation on how to run pipelines.
*/
class BeamSqlExample {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
//define the input row format
Schema type =
Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
Row row1 = Row.withSchema(type).addValues(1, "row", 1.0).build();
Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();
//create a source PCollection with Create.of();
PCollection<Row> inputTable =
PBegin.in(p)
.apply(
Create.of(row1, row2, row3)
.withSchema(
type, SerializableFunctions.identity(), SerializableFunctions.identity()));
//Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<Row> outputStream =
inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
// print the output record of case 1;
outputStream.apply(
"log_result",
MapElements.via(
new SimpleFunction<Row, Void>() {
@Override
public @Nullable Void apply(Row input) {
// expect output:
// PCOLLECTION: [3, row, 3.0]
// PCOLLECTION: [2, row, 2.0]
System.out.println("PCOLLECTION: " + input.getValues());
return null;
}
}));
// Case 2. run the query with SqlTransform.query over result PCollection of case 1.
PCollection<Row> outputStream2 =
PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
.apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
// print the output record of case 2;
outputStream2.apply(
"log_result",
MapElements.via(
new SimpleFunction<Row, Void>() {
@Override
public @Nullable Void apply(Row input) {
// expect output:
// CASE1_RESULT: [row, 5.0]
System.out.println("CASE1_RESULT: " + input.getValues());
return null;
}
}));
p.run().waitUntilFinish();
}
}
源码&Demo;https://github.com/apache/beam/tree/master
参考链接:https://beam.apache.org/documentation/dsls/sql/data-types/
Apache Beam: 一个高级且统一的编程模型
让批处理和流数据处理的作业在任何执行引擎上都可以运行.
网友评论