美文网首页spark程序员
Spark版本升级二三事

Spark版本升级二三事

作者: 伍柒大人的三言两语 | 来源:发表于2018-02-16 18:15 被阅读521次

    起因:部门准备将数据仓库开发工具从Hive SQL大规模迁移至Spark SQL。此前集群已经自带了Spark-1.5.2,系HDP-2.3.4自带的Spark组件,现在需要将之升级到目前的最新版本(2.2.1)。作为一个提供给第三方使用的开发工具,应该避免第三方过度浪费时间于工具本身的使用(为SQL任务调试合理的资源分配),故需要引入spark的DRA机制(Dynamic Resource Allocation),实现spark任务的资源动态分配。

    集群环境:HDP-2.3.4, Hadoop-2.7.1,Hive-1.2.1

    一. 创建spark目录

    从官网下载spark-2.2.1版本:http://spark.apache.org/downloads.html
    这里需要额外说明,spark-2.2.1编译依赖的Hive版本为1.2.1,如果与集群所使用版本不符,需要自己根据Hive版本重新编译。

    1. 拷贝至部署服务器指定目录:/usr/custom/,解压并重命名为:spark-2.2.1;
    2. 在/usr/custom/目录下创建软链接:spark -> /usr/custom/spark-2.2.1
    ln -s /usr/custom/spark-2.2.1 /usr/custom/spark
    

    备注:第三方开发工具引用地址:/usr/custom/spark,后续spark版本更新迭代与第三方开发工具无感知。

    二. 修改配置文件

    1. 进入目录 /usr/custom/spark-2.2.1/conf/,修改spark-env.sh:
    mv  spark-env.sh.template  spark-env.sh
    vim spark-env.sh
    

    添加如下内容(HDP安装目录:/usr/hdp/current/):

    export HADOOP_HOME=/usr/hdp/current/hadoop-client
    export HADOOP_CONF_DIR=/usr/hdp/current/hadoop-client/conf
    export YARN_CONF_DIR=/usr/hdp/current/hadoop-yarn-client/etc/hadoop
    export HADOOP_YARN_HOME=/usr/hdp/current/hadoop-yarn-client
    export HIVE_CONF_DIR=/usr/hdp/current/hive-client/conf
    export HIVE_HOME=/usr/hdp/current/hive-client
    
    export JAVA_HOME=/usr/local/jdk1.8.0_111
    export SPARK_LOG_DIR=/var/log/spark2
    export SPARK_CONF_DIR=/usr/custom/spark/conf
    
    1. 进入目录 /usr/custom/spark-2.2.1/conf/,修改spark-defaults.conf:
    mv spark-defaults.conf.template spark-defaults.conf
    vim spark-defaults.conf
    

    添加如下内容:

    spark.eventLog.enabled   true
    spark.eventLog.dir       hdfs://${clusterName}/tmp/spark-events
    spark.eventLog.compress  true
    
    spark.ui.enabled         true
    spark.ui.killEnabled     false
    spark.ui.port            18080
    
    spark.history.ui.port              18080
    spark.history.fs.cleaner.enabled   true
    spark.history.fs.logDirectory      hdfs://${clusterName}/tmp/spark-events
    spark.history.fs.cleaner.interval  1d
    spark.history.fs.cleaner.maxAge    3d
    
    spark.yarn.appMasterEnv.JAVA_HOME=/usr/local/jdk1.8.0_111
    spark.executorEnv.JAVA_HOME=/usr/local/jdk1.8.0_111
    spark.debug.maxToStringFields=50
    

    【填坑一】Ranger依赖
    如果Hive依托于Ranger进行权限管理,那么这里就需要对spark-defaults.conf额外添加:

    spark.yarn.dist.files=/usr/custom/spark/conf/hive-site.xml,/usr/custom/spark/conf/ranger-hdfs-audit.xml,/usr/custom/spark/conf/ranger-hdfs-security.xml
    

    从hive-client/conf下拷贝hive-site.xml到/usr/custom/spark-2.2.1/conf,并删除其中关于ranger的相关配置,ranger不支持spark的权限控制。然而,spark SQL不可避免需要读取Hive metastore元数据库,这一步仍然需要Ranger的验证,故需要拷贝ranger-hdfs-audit.xml和ranger-hdfs-security.xml到/usr/custom/spark-2.2.1/conf目录下。设置这三个配置文件spark启动时自动加载(spark.yarn.dist.files)。

    3.创建目录: /var/log/spark2,并开放所有用户可读,yarn history server日志保存路径;创建hdfs目录:/tmp/spark-events,并开放所有用户可读可写, spark application日志保存路径;

    4.在部署服务器上启动:/usr/custom/spark/sbin/start-history-server.sh,登陆http://${hostname}:18080/验证。

    三. 添加Spark DRA机制底层依赖

    引入DRA机制需要yarn和hadoop底层支持,故需要增加以下配置:
    a. 进入ambari管理页面:Yarn -> Configs -> Advanced
    Node Manager

    yarn.nodemanager.aux-services : mapreduce_shuffle,spark_shuffle
    

    Custom yarn-site:

    yarn.nodemanager.aux-services.spark_shuffle.class : org.apache.spark.network.yarn.YarnShuffleService
    

    b. 添加spark-shuffle.jar支持:
    shuffle jar路径:/usr/custom/spark/yarn/spark-2.2.1-yarn-shuffle.jar

    在集群每一台NodeManager节点上做如下配置:

    1. 将shuffle的jar拷贝到/usr/cbas/spark2/lib/目录下,如果新加机器没有该目录,需要手动创建;

    2. /usr/cbas/spark2/lib/目录下创建软连接:spark-yarn-shuffle.jar -> /usr/cbas/spark2/lib/spark-2.2.1-yarn-shuffle.jar

    ln -s /usr/cbas/spark2/lib/spark-2.2.1-yarn-shuffle.jar spark-yarn-shuffle.jar
    
    1. /usr/hdp/2.3.4.7-4/hadoop-yarn/lib/目录下创建软连接:spark-yarn-shuffle.jar -> /usr/cbas/spark2/lib/spark-yarn-shuffle.jar
    ln -s /usr/cbas/spark2/lib/spark-yarn-shuffle.jar spark-yarn-shuffle.jar
    

    (两重软连接是为了以后如果需要升级spark,更换spark-yarn-shuffle.jar不需要重启NodeManager)

    1. 重启所有的NodeManager服务;

    四. 配置Spark资源动态分配

    在/usr/custom/spark/conf/spark-defaults.conf文件中添加如下配置:

    
    spark.driver.extraLibraryPath=/ur/hdp/current/hadoop-client/lib/native
    spark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native
    
    spark.dynamicAllocation.enabled=true                          //开启动态资源分配机制;
    spark.shuffle.service.enabled=true                            //开启外部shuffle service服务,防止正在shuffle文件的executor被回收;
    spark.dynamicAllocation.schedulerBacklogTimeout=3s            //当spark application因没有足够资源挂起时长达到(3s)时,开始动态资源分配;
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=3s   //每隔(3s)申请一次,直达申请到足够资源,每次申请的executor个数呈指数增长(1,2,4,8...)
    spark.dynamicAllocation.executorIdleTimeout=30s               //当executor空闲时间超过(30s)时,executor会被回收;
    #spark.dynamicAllocation.cachedExecutorIdleTimeout=infinity   //默认值,如果executor有缓存数据则不会被回收;
    spark.dynamicAllocation.initialExecutors=3                    //spark application初始化时分配的executor个数;
    spark.dynamicAllocation.maxExecutors=200                      //单个spark application允许分配的最大executor个数;
    spark.dynamicAllocation.minExecutors=0                        //spark application空闲时允许减少到最小executor个数;
    
    spark.yarn.driver.memoryOverhead=1024                         //driver上允许的最大堆外内存(1024M);
    spark.yarn.executor.memoryOverhead=1024                       //executor上允许的最大堆外内存(1024M);
    
    spark.driver.memory=3g                                        //driver占用内存,yarn容器允许最小资源单位(3G);
    spark.executor.memory=3g                                      //每个executor占用内存,yarn容器允许最小资源单位(3G);
    spark.executor.cores=2                                        //每个exexutor占用CPU core数;
    spark.serializer=org.apache.spark.serializer.KryoSerializer   //修改序列化方式,默认为JavaSerializer;
    

    以上配置,spark application提交一个任务时:

    初始分配资源: driver.memory (3g)+ driver.memoryOverhead (1g) + ( executor.memory (3g) + executor.memoryOverhead (1g) ) * 3  = 16g; 
    允许分配的最大资源: driver.memory (3g) + driver.memoryOverhead (1g) + ( executor.memory (3g) + executor.memoryOverhead (1g)) * 200  = 804g;
    

    spark配置优先级:SparkConf core代码 > spark-submit --选项 > spark-defaults.conf配置 > spark-env.sh配置 > 默认值
    因此,spark-defaults.conf里面设置的配置参数可以被(SparkConf core代码 && spark-submit --选项 )覆盖,对于逻辑复杂的大任务可以指定不开启动态资源分配(spark.dynamicAllocation.enabled=false)

    【填坑二】 jersey版本冲突之一
    报错信息类似如下:

    Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
            at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:45)
            at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:163)
            at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
            at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150)
            at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
            at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:500)
            at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2256)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:831)
            at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:823)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
            at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:57)
            at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.<init>(SparkSQLCLIDriver.scala:288)
            at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:137)
            at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:606)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
    ......
    

    原因:Hadoop-2.x依赖jersey-1.9,而Spark-2.x依赖jersey-2.22,两个版本发生过重大重构,代码包路径已发生变更。
    jersey-1.9: com.sun.jersey
    jersey-2.22:org.glassfish.jersey

    解决方案:将jersey-core-1.9.jar 和 jersey-client-1.9.jar拷贝至/usr/custom/spark/jars/目录下,再重命名避免加载器加载版本冲突问题。

    【填坑三】 jersey版本冲突之二
    报错信息类似如下:

    java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map;
        org.glassfish.jersey.server.ApplicationHandler.<init>(ApplicationHandler.java:331)
        org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:392)
        org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
        org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
    

    原因:当用户在Spark history server上查看application的运行情况时,发现history server无法读取日志,报错信息如上,其本质上也是jersey版本冲突导致的。

    在jersey-core-1.9.jar的源代码里,自己集成了javax.ws.rs相关的代码。而在jersey-2.x版本,已经将该部分代码剔除,转而依赖javax.ws.rs-api-2.0.1.jar (javax.ws.rs)。在jersey-core-1.9.jar里是没有javax.ws.rs.core.Application.getProperties()这个方法。

    解决方法一:在github上下载jersey-core-1.9.jar的源码,自行添加javax.ws.rs.core.Application.getProperties(),并重新编译打包;
    解决方法二:在github上下载jersey-core-1.9.jar的源码,将所有javax.ws.rs相关代码剔除,转而依赖javax.ws.rs-api-2.0.1.jar;

    【填坑四】 Spark SQL无法insert/overwrite无分区表
    报错信息类似如下(找不到对应参数的loadTable方法):

    NoSuchMethodException:org.apche.hadoop.,hive.ql.metada.Hive. loadTable(org.apche.hadoop.fs.Path,java.lang.String,boolean,boolean)
    

    原因
    hive SQL自带loadTable方法:

    public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
       boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask,
       Long txnId, int stmtId, boolean isMnTable) throws HiveException {
    

    spark SQL自带loadTable方法:

    public void loadTable(Path loadPath, String tableName, boolean replace, boolean holdDDLTime, boolean isSrcLocal,
       boolean isSkewedStoreAsSubdir, boolean isAcid) throws HiveException {
          List<Path> newFiles = new ArrayList<Path>();
          Table tbl = this.getTable(tableName);
          HiveConf setssionConf = SessionState.getSessionConf(); 
    

    spark SQL 读取 hive表的元数据时,需要与 hive metastore 进行交互。spark实例化HiveMetastoreClient时需要依赖spark.sql.hive.metastore.jars参数指向的jars目录;
    之前的做法是将spark.sql.hive.metastore.jars直接指向/usr/hdp/current/hive-client/lib/*, 用hive自带的jars依赖来连接hive metastore,导致spark进行insert/overwrite操作时报错方法不存在;

    解决方案:将spark2连接hive需要加载的hive相关的jars(hive-*.spark2.jar,/usr/custom/spark/jars目录下) ,与/usr/hdp/current/hive-client/lib/目录下的jars,剔除hive-client中跟spark自带hive-*系列冲突的jars,多次测试解决spark自带hive-*系列需要依赖的其它jars,重新整合成一份新的jars目录(假设路径存在于:/usr/cbas/spark2/hive-jars/)。
    在spark-defaults.conf中添加如下配置:

    spark.sql.hive.metastore.jars=/usr/cbas/spark2/hive-jars/*
    

    【填坑五】 LZO压缩

    Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
            at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
            at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
            at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
    

    原因:LZO压缩算法的jar不存在
    解决方法:拷贝Hadoop自带的lzo jar(hadoop-lzo-0.6.0.2.3.4.7-4.jar)到/usr/custom/spark/jars/目录下。

    【填坑六】 spark-sql无法加载Hive UDF的jar

    /usr/custom/spark/bin/spark-sql  --deploy-mode client
    add jar hdfs://${clusterName}/user/hive/udf/udf.jar
    

    报错信息如下:

    java.lang.ExceptionInInitializerError
    at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:662)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:889)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:947)
    at java.io.DataInputStream.read(DataInputStream.java:100)
    ......
    Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:746)
    at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:376)
    at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:662)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:889)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:947)
    ......
    

    解决方法
    在/usr/custom/spark/conf/spark-defaults.conf文件中添加如下配置:spark.jars=hdfs://cbasNA/user/hive/udf/udf.jar

    结束语:spark SQL处理多shuffle任务的性能,较于Hive的高效毋庸置疑。然而在正式生产环境想要大规模的应用,仍然需要对底层源码的大量重构,比如:

    1. spark SQL目前只支持client模式;
    2. 如果输入是大量小于一个逻辑块的小文件,会导致RDD partition过多(默认TextInputFormat,输入时无法对小文件合并); 参考:https://issues.apache.org/jira/browse/SPARK-13664
    3. 当任务逻辑复杂需要加大spark.sql.shuffle.partitions,而结果数据普遍较小时,会产生大量的小文件;
    修改源代码,在spark SQL执行计划后面添加重分区操作:
    spark.sql.result.repartitions.enabled=true   =>  启用用户自定义输出文件个数功能,默认不启用
    spark.sql.result.repartitions.num=10          =>  设置输出文件个数,默认为3
    

    博客主页:https://www.jianshu.com/u/e97bb429f278

    相关文章

      网友评论

      • cc16dec181f4:博主整理的很完整,我也遇到了以上问题,想借博主文章整理下笔记,如有不适,请回复。给个大大的赞。
      • Jonathan丶Wei:博主【填坑三】 jersey版本冲突之二有编译好的包么?我这边编译失败。我的邮箱是252637867@qq.com,谢谢。有的话发我一份。
        伍柒大人的三言两语:@Jonathan丶Wei https://github.com/jersey/jersey-1.x-old/tree/master/jersey
        Jonathan丶Wei:@伍柒大人_HQQ 1.9的源码地址有么?我查了貌似最低就到1.10,没有1.9的。我用1.10修改jersery-core的pom文件里面的两个javax.ws.rs版本修改为2.0.1编译是报错的
        伍柒大人的三言两语:@Jonathan丶Wei 公司内网编译,无法拷出来

      本文标题:Spark版本升级二三事

      本文链接:https://www.haomeiwen.com/subject/zyjptftx.html