美文网首页Greenplum
比pgload更快更方便写入大数据量至Greenplum的Gre

比pgload更快更方便写入大数据量至Greenplum的Gre

作者: 秣码一盏 | 来源:发表于2020-03-08 17:28 被阅读0次

    前序

    Greenplum是目前比较优秀的mpp数据库,其官方推荐了几种将外部数据写入Greenplum方式,包含:通用的Jdbc,pgcopy和pgload以及Pivotal Greenplum-Spark Connector等。

    • Jdbc:Jdbc方式,写大数据量会很慢。
    • pgcopy:其中pgcopy是及其不推荐的一种,因为其写数据必须经过Greenplum的master,因此也只建议小数据量使用。
    • pgload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考pgload
    • Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。

    2. Greenplum-Spark Connector读数据架构

    一个Spark application,是由Driver和Executor节点构成。当Spark application使用Greenplum-Spark Connector加载Greenplum数据时,其Driver端会通过JDBC的方式请求Greenplum的master节点获取相关的元数据信息。Connector将会根据这些元数据信息去决定Spark的Executor去怎样去并行的读取该表的数据。

    Greenplum数据库存储数据是按segment组织的,Greenplum-Spark Connector在加载Greenplum数据时,需要指定Greenplum表的一个字段作为Spark的partition字段,Connector会使用这个字段的值来计算,该Greenplum表的某个segment该被哪一个或多个Spark partition读取。

    其读取过程如下:

    1. Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
    2. Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
    3. 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。

    其示意流程图如下:


    Greenplum-Spark Connector

    3. Greenplum-Spark Connector写数据流程

    1. GSC在Spark Executor端通过Jetty启动一个Http服务,将该服务封装为支持Greenplum的gpfdist协议。
    2. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,创建Greenplum外部表,该外部表文件地址指向该Executor所启动的gpfdist协议地址。SQL示例如下:
    CREATE READABLE EXTERNAL TABLE
    "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")
    LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')
    FORMAT 'CSV'
    (DELIMITER AS '|'
     NULL AS '')
    ENCODING 'UTF-8'
    
    1. GSC在Spark Executor端通过Jdbc方式连接Greenplum master,然后执行insert语句至真实的表中,数据来源于这张外部表。SQL示例如下:
    INSERT INTO "public"."rank_a1"
    SELECT *
    FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"
    

    至于这张外部表的数据,是否落地当前Executor服务器,不清楚。猜测不会落地,而是直接通过Http直接传递给了Greenplum对应的Segment。

    1. GSC监听onApplicationEnd事件,在Spark application结束后,删除创建的外部表。

    4. Greenplum-Spark Connector使用

    1. 下载GSC Jar包。
      下载地址:Pivotal Network
      可直接下载最新版本的GSC即1.6.2,支持Greenplum5.0之后的版本。greenplum-spark_<spark-version>-<gsc-version>.jar,如:
    greenplum-spark_2.11-1.6.2.jar
    
    
    1. maven中引入:
            <dependency>
                <groupId>io.pivotal.greenplum.spark</groupId>
                <artifactId>greenplum-spark_2.11</artifactId>
                <version>1.6.2</version>
            </dependency>
    
    1. spark提交引入:
    • spark-shell或spark-submit时候,通过--jars加入greenplum-spark_2.11-1.6.2.jar。
    • 将greenplum-spark_2.11-1.6.2.jar与Spark application包打成 uber jar 提交。

    5. Greenplum-Spark Connector参数

    参数名 参数描述 作用域
    url Jdbc连接的url。 读,写
    dbschema Greenplum数据库的schema,GSC创建的临时外部表也在该schema下,默认值为public。 读,写
    dbtable Greenplum数据库的表名,GSC在读取时,会读取dbschema下的表。GSC在写数据时,如果该表不存在会自动创建。 读,写
    driver Jdbc driver全类名,非必填,在GSC Jar包中已经包含了driver包。 读,写
    user 用户名 读,写
    password 密码 读,写
    partitionColumn Greenplum数据表的字段,该字段将作为Spark分区的字段,支持integer, bigint, serial, bigserial4中类型,该字段名需小写。该字段为必填,且必须是Greenplum表建表时 DISTRIBUTED BY (<column>)语句中的字段。
    partitions Spark分区数,非必填,其默认值为Greenplum的primary segments数量。
    truncate 当在Spark中指定了输出模式为SaveMode.Overwrite时候,写的目标表存在的时候的策略,非必填。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。
    iteratorOptimization 指定写数据时内存模式,非必填。默认指为true,GSC将会使用 Iterator 方式。当为false时,GSC将会在写数据时将数据存储在内存中。
    server.port 指定在Spark Worker端启动gpfdist服务的端口号,非必填。默认情况下会使用随机的端口号。 读,写
    server.useHostname 指定是否使用Spark Worker节点的host name为gpfdis服务的地址,非必填。默认为false。 读,写
    pool.maxSize GSC连接Greenplum的连接池的最大连接数,默认为64。 读,写
    pool.timeoutMs 非活动连接被认为是空闲连接的时间,毫秒值。默认为10000(10秒)。 读,写
    pool.minIdle GSC连接Greenplum的连接池的最小空闲连接数,默认为0。 读,写

    6. 从Greenplum读取数据

    1. DataFrameReader.load()方式:
    val gscReadOptionMap = Map(
          "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
          "user" -> "bill",
          "password" -> "changeme",
          "dbschema" -> "myschema",
          "dbtable" -> "table1",
          "partitionColumn" -> "id"
    )
    
    val gpdf = spark.read.format("greenplum")
          .options(gscReadOptionMap)
          .load()
    
    1. spark.read.greenplum()方式:
    val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"
    val tblname = "avgdelay"
    val jprops = new Properties()
    jprops.put("user", "user2")
    jprops.put("password", "changeme")
    jprops.put("partitionColumn", "airlineid")
    val gpdf = spark.read.greenplum(url, tblname, jprops)
    

    然鹅,这种方式必然需要引入一个隐式转换,官网也没介绍。

    7. 写数据至Greenplum

    7.1. 写数据示例:

    val gscWriteOptionMap = Map(
          "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
          "user" -> "bill",
          "password" -> "changeme",
          "dbschema" -> "myschema",
          "dbtable" -> "table2",
    )
    
    dfToWrite.write.format("greenplum")
          .options(gscWriteOptionMap)
          .save()
    

    在通过GSC写到Greenplum表时,如果表已经存在或表中已经存在数据,可通过DataFrameWriter.mode(SaveMode savemode)方式指定其输出模式。相关模式行为如下:

    SaveMode 行为
    ErrorIfExists 如果Greenplum数据表已经存在则GSC直接返回错误,该策略为默认策略。
    Append 直接将Spark中数据追加至表中。
    Ignore 如果Greenplum数据表已经存在,GSC将不会写数据至表中也不会去修改已经存在的数据。
    Overwrite 如果Greenplum数据表已经存在,则truncate参数将会生效。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。

    7.2. GSC自动建表:

    1. 创建的Greenplum表将不会有distribution列,如下为GSC生成的建表语句:
    CREATE TABLE "public"."rank_a1" 
    ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
    
    1. 创建的Greenplum表的字段名将会使用Spark DataFrame中的字段名。
    2. 在GSC自动建表时,将会为字段名加上双引号,这将使Greenplum区分大小写。
    3. 当Spark DataFrame的字段不为nullable时,GSC自动建表的字段将是 NOT NULL。
    4. 将会对应的Spark DataFrame字段类型映射为Greenplum的字段类型。参考,字段类型映射表

    7.3. 提前手动建表:

    1. 将Spark DataFrame的字段名的数据写至Greenplum表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
    2. 写至Greenplum的字段的数据类型,与对应的Spark DataFrame一致,具体参见字段类型映射
    3. 如果Spark数据中某列包含空数据,需确保对应的Greenplum表的列没有被指定为NOT NULL。
    4. Greenplum表中建表时其字段顺序可以与Spark DataFrame中不一致。但Greenplum表中不能出现不存在在Spark DataFrame中的字段。如下例子:
    // Greenplum 中的字段
    CREATE TABLE public.rank_a1 (
        id int4 NOT NULL,
        "rank" text NULL,
        "year" int4 NOT NULL,
        gender int4 NOT NULL,
        count int4 NOT NULL
    )
    DISTRIBUTED BY (id);
    
    // Spark DataFrame中的字段
    var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")
    
    // 在写数据至public.rank_a1表时,将会报错如下
    Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
    Old column names (5): _1, _2, _3, _4, _5
    New column names (4): id, rank, year, gender
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435)
        at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)
        at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14)
        at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)
    
    
    1. 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。

    8. Troubleshooting

    8.1. 端口相关问题

    错误信息 原因 解决办法
    java.lang.RuntimeException:<port-number> is not a valid port number. 通过server.port所指定的端口无效,比如1024以内,为系统使用端口 指定端口在[1024-65535]之间
    java.lang.RuntimeException:Unable to start GpfdistService on any of ports=<list-of-port-numbers> 通过server.port指定的端口已经被占用 从新指定一个未被占用的端口,或不指定该参数

    8.2. Greenplum连接数问题

    当连接Greenplum的连接数接近Greenplum数据库配置的最大连接数(max_connections)时。Spark application将会抛出 connection limit exceeded 错误。

    排查过程:

    1. 查询Greenplum数据的最大连接数:
    postgres=# show max_connections;
     max_connections
    -----------------
     250
    (1 row)
    
    1. 查询当前连接Greenplum数据库的连接数:
    postgres=# SELECT count(*) FROM pg_stat_activity;
    
    1. 查询指定的用户连接Greenplum数据的连接数:
    postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';
    postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
    
    1. 查询Greenplum数据库空闲和活动的连接数:
    postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';
    postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
    
    1. 查询连接Greenplum数据库名,用户名,客户端地址,客户端ip,当前查询语句:
    postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;
    

    如果确认是Spark application使用连接数过多,则配置JDBC Connection Pooling相关参数,减少连接数。

    8.3. Greenplum Database Data Length Errors

    在使用Greenplum 4.x或5.x的时候,可能会报出“data line too long”错误。这是因为在Greenplum数据库中参数项“gp_max_csv_line_length”默认值是1M。需要登陆Greenplum master修改这个参数值,示例如下,通过gpconfig修改该参数的值为5M:

    gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880
    gpadmin@gpmaster$ gpstop -u
    

    9. 类型映射表

    9.1. Greenplum to Spark

    Greenplum Data Type Spark Data Type
    bigint LongType
    bigSerial LongType
    boolean BooleanType
    char StringType
    date DateType
    decimal DecimalType
    float4 FloatType
    float8 DoubleType
    int IntegerType
    serial IntegerType
    smallInt ShortType
    text StringType
    time TimeStampType
    timestamp TimeStampType
    timestamptz TimeStampType
    timetz TimeStampType
    varchar StringType

    9.2. Spark to Greenplum

    Spark Data Type Greenplum Data Type
    BinaryType bytea
    BooleanType boolean
    DateType date
    DecimalType numeric
    DoubleType float8
    FloatType float4
    IntegerType int
    LongType bigint
    ShortType smallInt
    StringType text
    TimeStampType timestamp

    10. 参考

    1. Greenplum-Spark Connector官方文档
    2. Greenplum建表语句文档
    3. Greenplum参数配置官方文档

    相关文章

      网友评论

        本文标题:比pgload更快更方便写入大数据量至Greenplum的Gre

        本文链接:https://www.haomeiwen.com/subject/hchjdhtx.html