美文网首页
Apache Beam SQL

Apache Beam SQL

作者: HelloWide | 来源:发表于2019-03-28 16:23 被阅读0次

Beam不仅支持java,python还支持SQL分析,非常类似于Spark SQL;
Beam SQL 现在只支持 Java,底层是 Apache Calcite 的一个动态数据管理框架,用于大数据处理和一些流增强功能,它允许你自定义数据库功能。例如 Hive 使用了 Calcite 的查询优化,当然还有 Flink 解析和流 SQL 处理。Beam 在这之上添加了额外的扩展,以便轻松利用 Beam 的统一批处理 / 流模型以及对复杂数据类型的支持。


流程图

简化如下:


简化流程
  • 给定一个PCollection和查询作为输入,首先将输入PCollection注册为模式存储库中的一个表;
  • 根据语法对SQL查询进行解析,生成SQL抽象语法树;
  • 验证表结构,并输出用关系代数表示的逻辑计划;
  • 应用关系规则将逻辑计划转换为物理计划,表示为 Beam组件. 优化器是可选的,以更新计划;
  • 最终, Beam 物理计划被编译为复合 PTransform;

Test Code

说明:所有案例都在源码中有;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.xxx.sqlbeam;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;

import javax.annotation.Nullable;

/**
 * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
 *
 * <p>Run the example from the Beam source root with
 *
 * <pre>
 *   ./gradlew :beam-sdks-java-extensions-sql:runBasicExample
 * </pre>
 *
 * <p>The above command executes the example locally using direct runner. Running the pipeline in
 * other runners require additional setup and are out of scope of the SQL examples. Please consult
 * Beam documentation on how to run pipelines.
 */
class BeamSqlExample {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
    options.setRunner(DirectRunner.class);
    Pipeline p = Pipeline.create(options);


    //define the input row format
    Schema type =
        Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();

    Row row1 = Row.withSchema(type).addValues(1, "row", 1.0).build();
    Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
    Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();

    //create a source PCollection with Create.of();
    PCollection<Row> inputTable =
        PBegin.in(p)
            .apply(
                Create.of(row1, row2, row3)
                    .withSchema(
                        type, SerializableFunctions.identity(), SerializableFunctions.identity()));

    //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
    PCollection<Row> outputStream =
        inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));

    // print the output record of case 1;
    outputStream.apply(
        "log_result",
        MapElements.via(
            new SimpleFunction<Row, Void>() {
              @Override
              public @Nullable Void apply(Row input) {
                // expect output:
                //  PCOLLECTION: [3, row, 3.0]
                //  PCOLLECTION: [2, row, 2.0]
                System.out.println("PCOLLECTION: " + input.getValues());
                return null;
              }
            }));

    // Case 2. run the query with SqlTransform.query over result PCollection of case 1.
    PCollection<Row> outputStream2 =
        PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
            .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));

    // print the output record of case 2;
    outputStream2.apply(
        "log_result",
        MapElements.via(
            new SimpleFunction<Row, Void>() {
              @Override
              public @Nullable Void apply(Row input) {
                // expect output:
                //  CASE1_RESULT: [row, 5.0]
                System.out.println("CASE1_RESULT: " + input.getValues());
                return null;
              }
            }));

    p.run().waitUntilFinish();
  }
}


源码&Demo;https://github.com/apache/beam/tree/master
参考链接:https://beam.apache.org/documentation/dsls/sql/data-types/
Apache Beam: 一个高级且统一的编程模型
让批处理和流数据处理的作业在任何执行引擎上都可以运行.

相关文章

  • Apache Beam SQL

    Beam不仅支持java,python还支持SQL分析,非常类似于Spark SQL;Beam SQL 现在只支持...

  • Apache Beam

    Apache Beam基本架构 Apache Beam主要由Beam SDK和Beam Runner组成,Beam...

  • 数据处理的内容、地点、时间和方式

    为了让您了解实际情况,我使用Apache Beam代码片段,并结合延时图来提供可视化的表示。Apache Beam...

  • Apache Beam介绍

    Apache Beam提供了统一的大数据编程抽象,提供了不同的执行引擎支持,比如Spark/Flink/Storm...

  • Apache Beam 处理文件

    今天我们介绍了如何使用pipeline在 Apache Beam 中的文件中读取、写入数据,其中“Employee...

  • 让Apache Beam在GCP Cloud Dataflow上

    简介 在文章《Apache Beam入门及Java SDK开发初体验[https://www.pkslow.com...

  • Apache Beam入门学习一

    一、Beam编程基本概念 PCollection:数据集,可能是有界数据集(数据量有限)和无界数据集(数据量无限)...

  • apache beam 简介和安装

    1.Apache beam 是google和其合作伙伴开源的新的流式大数据分析模式,目前支持如下的引擎: 2.执行...

  • Apache Beam研究报告

    概述 本文不是一篇Beam的入门文档,不会介绍Beam的基本概念;而会主要探讨Beam的表达力,Beam的性能,以...

  • Apache Beam Pipeline设计.docx

    本文将帮助你理解如何设计pipeline。它包含关于如何确定pipeline结构、如何选择将哪些transform...

网友评论

      本文标题:Apache Beam SQL

      本文链接:https://www.haomeiwen.com/subject/uitcbqtx.html