美文网首页
Apache Beam SQL

Apache Beam SQL

作者: HelloWide | 来源:发表于2019-03-28 16:23 被阅读0次

    Beam不仅支持java,python还支持SQL分析,非常类似于Spark SQL;
    Beam SQL 现在只支持 Java,底层是 Apache Calcite 的一个动态数据管理框架,用于大数据处理和一些流增强功能,它允许你自定义数据库功能。例如 Hive 使用了 Calcite 的查询优化,当然还有 Flink 解析和流 SQL 处理。Beam 在这之上添加了额外的扩展,以便轻松利用 Beam 的统一批处理 / 流模型以及对复杂数据类型的支持。


    流程图

    简化如下:


    简化流程
    • 给定一个PCollection和查询作为输入,首先将输入PCollection注册为模式存储库中的一个表;
    • 根据语法对SQL查询进行解析,生成SQL抽象语法树;
    • 验证表结构,并输出用关系代数表示的逻辑计划;
    • 应用关系规则将逻辑计划转换为物理计划,表示为 Beam组件. 优化器是可选的,以更新计划;
    • 最终, Beam 物理计划被编译为复合 PTransform;

    Test Code

    说明:所有案例都在源码中有;

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.xxx.sqlbeam;
    
    import org.apache.beam.runners.direct.DirectRunner;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.extensions.sql.SqlTransform;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.schemas.Schema;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.SerializableFunctions;
    import org.apache.beam.sdk.transforms.SimpleFunction;
    import org.apache.beam.sdk.values.*;
    
    import javax.annotation.Nullable;
    
    /**
     * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
     *
     * <p>Run the example from the Beam source root with
     *
     * <pre>
     *   ./gradlew :beam-sdks-java-extensions-sql:runBasicExample
     * </pre>
     *
     * <p>The above command executes the example locally using direct runner. Running the pipeline in
     * other runners require additional setup and are out of scope of the SQL examples. Please consult
     * Beam documentation on how to run pipelines.
     */
    class BeamSqlExample {
      public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
        options.setRunner(DirectRunner.class);
        Pipeline p = Pipeline.create(options);
    
    
        //define the input row format
        Schema type =
            Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
    
        Row row1 = Row.withSchema(type).addValues(1, "row", 1.0).build();
        Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
        Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();
    
        //create a source PCollection with Create.of();
        PCollection<Row> inputTable =
            PBegin.in(p)
                .apply(
                    Create.of(row1, row2, row3)
                        .withSchema(
                            type, SerializableFunctions.identity(), SerializableFunctions.identity()));
    
        //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
        PCollection<Row> outputStream =
            inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
    
        // print the output record of case 1;
        outputStream.apply(
            "log_result",
            MapElements.via(
                new SimpleFunction<Row, Void>() {
                  @Override
                  public @Nullable Void apply(Row input) {
                    // expect output:
                    //  PCOLLECTION: [3, row, 3.0]
                    //  PCOLLECTION: [2, row, 2.0]
                    System.out.println("PCOLLECTION: " + input.getValues());
                    return null;
                  }
                }));
    
        // Case 2. run the query with SqlTransform.query over result PCollection of case 1.
        PCollection<Row> outputStream2 =
            PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
                .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
    
        // print the output record of case 2;
        outputStream2.apply(
            "log_result",
            MapElements.via(
                new SimpleFunction<Row, Void>() {
                  @Override
                  public @Nullable Void apply(Row input) {
                    // expect output:
                    //  CASE1_RESULT: [row, 5.0]
                    System.out.println("CASE1_RESULT: " + input.getValues());
                    return null;
                  }
                }));
    
        p.run().waitUntilFinish();
      }
    }
    
    

    源码&Demo;https://github.com/apache/beam/tree/master
    参考链接:https://beam.apache.org/documentation/dsls/sql/data-types/
    Apache Beam: 一个高级且统一的编程模型
    让批处理和流数据处理的作业在任何执行引擎上都可以运行.

    相关文章

      网友评论

          本文标题:Apache Beam SQL

          本文链接:https://www.haomeiwen.com/subject/uitcbqtx.html