美文网首页一步一步学习Sparkspark
CarbonData 1.2.0集成Spark 2.1.0调研

CarbonData 1.2.0集成Spark 2.1.0调研

作者: 分裂四人组 | 来源:发表于2017-11-01 19:56 被阅读203次

    编译

    carbondata1.2已经支持hive+presto,carbon生态圈基本健全。

    基于git checkout到branch-1.2,编译脚本:

    #/bin/bash
    
    mvn -DskipTests clean package
    

    注意:虽然在其文档中说支持jdk1.7/1.8,但测试发现有使用到Map[String, String].getOfDefault()方法,导致现分支只支持jdk1.8编译。当然也可以手动修改下,给社区提个PR了。

    测试

    参考https://carbondata.apache.org/installation-guide.html配置说明,部署carbondata:

    • 由于CarbonData只支持spark2.1.0的小版本,使用spark2.1.3集成时会报无CatalystConf类,这个是由于Spark2.1.0+以后将该类重构了,所以必须依赖Spark2.1.0小版本;
    • 按照文档说明,tar zcvf carbondata.tar.gz并在spark.yarn.dist.archives/spark.executor.extraClassPath路径指明,否则会报找不到CarbonData相关类(此处要注意的是,carbondatalib中的carbondata_xxx.jar不要是软链-_-!!!);
    • 配置Spark,需要将编译好的carbondata_xxx.jar包集成至spark依赖中;
    • 配置carbondata,将carbondata源码中的参考carbondata.conf.template复制到SPAKR_HOME/conf下,并修改几处主要的路径(carbondata优化参数很多,后续慢慢调整);
    • 可以尝试,通过spark-shell测试集成是否成功,测试sql如下;

    提供一份比较完整的spark-default.conf,注意有些路径写死了,需要修改:

    ## Driver/AM Settings ##
    spark.yarn.am.waitTime                                100s
    spark.yarn.am.cores                                   1
    spark.yarn.am.memory                                  4g
    spark.yarn.am.memoryOverhead                          2048
    spark.yarn.am.extraJavaOptions                        -XX:PermSize=1024m -XX:MaxPermSize=2048m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
    spark.driver.maxResultSize                            1g
    spark.driver.memory                                   4g
    
    
    ## Executor Settings ##
    spark.executor.instances                              0
    spark.executor.cores                                  4
    spark.executor.memory                                 4g
    spark.yarn.executor.memoryOverhead                    2048
    spark.executor.extraJavaOptions                       -XX:PermSize=1024m -XX:MaxPermSize=1024m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Dcarbon.properties.filepath=carbon.properties
    
    
    ## Dynamic Allocation Settings ##
    spark.shuffle.service.enabled                         true
    spark.dynamicAllocation.enabled                       true
    spark.dynamicAllocation.initialExecutors              0
    spark.dynamicAllocation.minExecutors                  0
    spark.dynamicAllocation.maxExecutors                  10
    spark.dynamicAllocation.executorIdleTimeout           60s
    #spark.dynamicAllocation.cachedExecutorIdleTimeout    10s
    
    ## SQL Configurations ##
    spark.sql.autoBroadcastJoinThreshold                  104857600
    spark.sql.warehouse.dir                               /user/warehouse
    #spark.sql.warehouse.dir                               /user/hadoop/warehouse
    # spark.sql.hive.convertCTAS                          true
    # spark.sql.sources.default                           parquet
    spark.sql.shuffle.partitions                          100
    
    spark.driver.extraJavaOptions                        -Dcarbon.properties.filepath=/home/hadoop/work/spark/conf/carbon.properties
    spark.driver.extraClassPath                          /home/hadoop/work/spark/carbonlib/*
    
    spark.executor.extraClassPath                        carbondata.tar.gz/carbonlib/*
    
    spark.yarn.dist.files                                /home/hadoop/work/spark/conf/carbon.properties
    spark.yarn.dist.archives                             /home/hadoop/work/spark/carbonlib/carbondata.tar.gz
    

    配置carbon.properties:

    #Mandatory. Carbon Store path
    carbon.storelocation=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/CarbonStore
    #Base directory for Data files
    carbon.ddl.base.hdfs.url=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/data
    #Path where the bad records are stored
    carbon.badRecords.location=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/badrecords
    

    测试SQL中注意的是getOrCreateCarbonSession()方法需要提供两个已经创建好并有权限的HDFS路径,其中前者为storePath存储load/overwrite进来数据的默认路径,后者为metaStorePath(貌似没有起到效果), 如下:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.CarbonSession._
    val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/data", "hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/carbon.metastore")
    carbon.sql("CREATE TABLE IF NOT EXISTS test_table2(id string, name string, city string, age Int) STORED BY 'carbondata'");
    carbon.sql("drop table test_table");
    carbon.sql("show tables").show()
    carbon.sql("LOAD DATA INPATH '/tmp/carbon/sample.csv' INTO TABLE test_table2")
    carbon.sql("SELECT * FROM test_table2").show()
    carbon.sql("SELECT city, avg(age), sum(age) FROM test_table2 GROUP BY city").show()
    

    CarbonData ThriftServer

    上述方式只能做测试使用,真正的生成环境需要还是需要基于ThriftServer实现。搭建ThriftServer,启动脚本:

    #!/bin/bash
    
    export SPARK_HOME=/home/hadoop/work/spark
    
    nohup ./bin/spark-submit \
            --conf spark.sql.hive.thriftServer.singleSession=true \
            --class org.apache.carbondata.spark.thriftserver.CarbonThriftServer  \
            $SPARK_HOME/carbonlib/carbondata_2.11-1.2.1-SNAPSHOT-shade-hadoop2.2.0.jar \
            hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/CarbonStore >logs/carbondata-thrift-server.log 2>&1 &
    

    TPC-DS测试

    官方并没有提供一个靠谱的测试集,正好手头上有tpc-ds之前的测试数据,所以就按照carbondata的语法修改了其创建表的语句,参考如下:

    use tpcds;
    
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;
    
    set hive.exec.max.dynamic.partitions.pernode=2000;
    set hive.exec.max.dynamic.partitions=2000;
    
    drop table if exists cb_store_sales;
    create table cb_store_sales
    (
      ss_sold_time_sk bigint,
      ss_item_sk bigint,
      ss_customer_sk bigint,
      ss_cdemo_sk bigint,
      ss_hdemo_sk bigint,
      ss_addr_sk bigint,
      ss_store_sk bigint,
      ss_promo_sk bigint,
      ss_ticket_number bigint,
      ss_quantity int,
      ss_wholesale_cost decimal(7,2),
      ss_list_price decimal(7,2),
      ss_sales_price decimal(7,2),
      ss_ext_discount_amt decimal(7,2),
      ss_ext_sales_price decimal(7,2),
      ss_ext_wholesale_cost decimal(7,2),
      ss_ext_list_price decimal(7,2),
      ss_ext_tax decimal(7,2),
      ss_coupon_amt decimal(7,2),
      ss_net_paid decimal(7,2),
      ss_net_paid_inc_tax decimal(7,2),
      ss_net_profit decimal(7,2)
    ) partitioned by (ss_sold_date_sk bigint)
    stored by 'carbondata'
    tblproperties('partition_type'='Hash','num_partitions'='31');
    
    insert overwrite table cb_store_sales partition(ss_sold_date_sk) select ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_cdemo_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_promo_sk,ss_ticket_number,ss_quantity,ss_wholesale_cost,ss_list_price,ss_sales_price,ss_ext_discount_amt,ss_ext_sales_price,ss_ext_wholesale_cost,ss_ext_list_price,ss_ext_tax,ss_coupon_amt,ss_net_paid,ss_net_paid_inc_tax,ss_net_profit,ss_sold_date_sk from et_store_sales distribute by ss_sold_date_sk;
    

    基于上述的ThriftServer提供的端口执行,执行脚本如下:

    export SPARK_BEELINE_HOME=/home/hadoop/work/spark
    ${SPARK_BEELINE_HOME}/bin/beeline -u "jdbc:hive2://hzadg-mammut-platform1.server.163.org:10010/tpcds;hive.server2.proxy.user=hadoop" -f "$bin/create-table-sql/create-load-carbondata-partition-fact.sql"
    

    其中注意:

    • SparkSQL其partitioned by并不需要指定partition类型及partition数量,但carbondata sql必须指定,同时其官方文档真实漏洞百出,num_partitions貌似文档中也是错的;
    • overwrite过程很慢,这个应该跟carbondata的设计有关,需要构造全局字典索引并写到HDFS中,所以耗时较长,30min+;

    执行完毕后就会在HDFS中找到其相关数据,比如meta信息如下:

    hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0
    Found 64 items
    -rw-r-----   3 hadoop hdfs      21284 2017-10-31 20:25 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/0_batchno0-0-1509450294012.carbonindex
    -rw-r-----   3 hadoop hdfs      10170 2017-10-31 20:31 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/10_batchno0-0-1509450294012.carbonindex
    -rw-r-----   3 hadoop hdfs      10170 2017-10-31 20:31 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/11_batchno0-0-1509450294012.carbonindex
    -rw-r-----   3 hadoop hdfs      10170 2017-10-31 20:17 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/12_batchno0-0-1509450294012.carbonindex
    ...
    
    
    hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales
    Found 2 items
    drwxr-x---   - hadoop hdfs          0 2017-10-31 19:44 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact
    drwxr-x---   - hadoop hdfs          0 2017-10-31 20:32 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata
    hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata
    Found 2 items
    -rw-r-----   3 hadoop hdfs       2796 2017-10-31 19:44 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata/schema
    -rw-r-----   3 hadoop hdfs        268 2017-10-31 20:32 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata/tablestatus
    

    验证TPC-DS

    当前Carbondata还有许多工作要处理,比如基于beeline创建完毕的cb_store_sales,通过beeline访问,设置其hive.server2.proxy.user=hadoop,查询该表时,有如下问题,原因是hive.server2.proxy.user=hadoop这个语句没有生效,但访问之前创建的表则没有问题:

    hadoop@hzadg-mammut-platform1:~/work/spark$ ./bin/beeline -u jdbc:hive2://hzadg-mammut-platform1.server.163.org:10010;hive.server2.proxy.user=hadoop
    
    0: jdbc:hive2://hzadg-mammut-platform1.server> select * from cb_store_sales limit 10;
    Error: org.apache.hadoop.security.AccessControlException: Permission denied: user=anonymous, access=EXECUTE, inode="/Carbon/CarbonStore/modifiedTime.mdt":hadoop:hdfs:drwxr-x---
            at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
            at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
            at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
            at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
            at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1722)
            at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108)
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3863)
            at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1012)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
            at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
            at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
            at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:415)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) (state=,code=0)
    

    现在基于CARBON_HOME/bin下的carbon-spark-sql执行,可以查询使用,测了下TPC-DS的一个数据集,基于carbondata和parquet数据格式,简单的操作(count, count(distinct),sum(), limit 10)carbondata均比parquet格式,性能要提升不少。

    后续

    • TPC-DS全量SQL性能比较;
    • CarbonData优化原理调研;

    参考:

    相关文章

      网友评论

        本文标题:CarbonData 1.2.0集成Spark 2.1.0调研

        本文链接:https://www.haomeiwen.com/subject/viagpxtx.html