美文网首页一步一步学习Spark我爱编程
基于Yarn集群Spark Streaming 提交任务解惑

基于Yarn集群Spark Streaming 提交任务解惑

作者: 分裂四人组 | 来源:发表于2018-05-17 10:54 被阅读13次

    参考项目: https://github.com/LiShuMing/spark-demos

    疑惑一、Spark提交任务依赖包问题?

    使用Spark打jar包是个比较头疼的问题:

    • 不能包冗余的依赖(比如hadoop/hbase)放到jar包里,有可能导致运行环境污染;
    • 不能太少:如果缺少必要的jar包,则会抛NoClassFoundException;

    所以,在使用场景中,如何编译出符合要求的最少依赖的提交jar呢?

    其实这里面有一个需要注意的地方(同时也是一个很诡异的地方),你在打包的时候需要清楚:哪些包你是不需要的,哪些包你是必须的。
    这个对用户小白来说是一件需要重复试验的工作,体验不好;

    解决思路:

    • 将所依赖的jar打进一个jar中;
    • 将所依赖的jar领出来,基于spark-submit --jars参数上传必须依赖的jar,供executor端使用;

    方案一、 基于assembly编译完整jar

    注意: 此种方案会将所有依赖jar编译至一个jar包中,比较危险,不推荐;
    TODO: 是否还有其他更优化的方案;

                 <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>assemble</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    

    方案二、基于assembly集成spark所依赖的jar

    在maven中添加assembly和dependency插件,并将dependency插件设置<excludeScope>provided</excludeScope>,这样可以将scope为provided级别的依赖不包含在最终的lib中:

                <plugin>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>process-sources</phase>
    
                            <goals>
                                <goal>copy-dependencies</goal>
                            </goals>
    
                            <configuration>
                                <excludeScope>provided</excludeScope>
                                <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            </configuration>
    
                        </execution>
                    </executions>
                </plugin>
                <!-- Assembly Plug-in -->
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <finalName>spark-demos-${project.version}</finalName>
                        <descriptors>
                            <descriptor>src/assembly/assembly.xml</descriptor>
                        </descriptors>
                        <tarLongFileMode>gnu</tarLongFileMode>
                    </configuration>
    
                    <executions>
                        <execution>
                            <id>assemble</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
    <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
        <id>distribution</id>
        <formats>
            <format>dir</format>
            <format>tar.gz</format>
        </formats>
    
        <fileSets>
            <fileSet>
                <directory>${project.basedir}/src/main/resources</directory>
                <outputDirectory>/conf</outputDirectory>
            </fileSet>
    
            <fileSet>
                <directory>${project.build.directory}/lib</directory>
                <outputDirectory>/lib</outputDirectory>
                <includes>
                    <include>*.*</include>
                </includes>
            </fileSet>
    
            <fileSet>
                <directory>${project.build.directory}/target/spark-demo-0.1.0.jar</directory>
                <outputDirectory>/</outputDirectory>
                <includes>
                    <include>*.*</include>
                </includes>
            </fileSet>
        </fileSets>
    </assembly>
    

    最终编译生成的路径如下,这个你会发现还是有很多冗余的jar(需要开发者在pom.xml中仔细排查,设置每个依赖,注意其引入的jar,如果有冲突设置exclude将其排除),可以手动地调整删除不必要的jar:

    ├── conf
    │   ├── conf.properties
    │   └── kafka_jaas.conf
    └── lib
        ├── commons-cli-1.2.jar
        ├── commons-codec-1.9.jar
        ├── commons-collections-3.2.2.jar
        ├── commons-httpclient-3.1.jar
        ├── commons-io-2.4.jar
        ├── commons-lang-2.6.jar
        ├── commons-lang3-3.3.2.jar
        ├── commons-logging-1.2.jar
        ├── commons-math-2.2.jar
        ├── disruptor-3.3.0.jar
        ├── findbugs-annotations-1.3.9-1.jar
        ├── guava-12.0.1.jar
        ├── hamcrest-core-1.3.jar
        ├── hbase-annotations-1.2.6.jar
        ├── hbase-client-1.2.6.jar
        ├── hbase-common-1.2.6-tests.jar
        ├── hbase-common-1.2.6.jar
        ├── hbase-hadoop-compat-1.2.6.jar
        ├── hbase-hadoop2-compat-1.2.6.jar
        ├── hbase-prefix-tree-1.2.6.jar
        ├── hbase-procedure-1.2.6.jar
        ├── hbase-protocol-1.2.6.jar
        ├── hbase-server-1.2.6.jar
        ├── htrace-core-3.1.0-incubating.jar
        ├── jackson-core-asl-1.9.13.jar
        ├── jackson-jaxrs-1.9.13.jar
        ├── jackson-mapper-asl-1.9.13.jar
        ├── jcodings-1.0.8.jar
        ├── jdk.tools-1.8.jar
        ├── jetty-util-6.1.26.jar
        ├── jline-0.9.94.jar
        ├── joni-2.1.2.jar
        ├── junit-4.12.jar
        ├── kafka-clients-0.10.0.1.jar
        ├── log4j-1.2.17.jar
        ├── lz4-1.3.0.jar
        ├── metrics-core-2.2.0.jar
        ├── netty-3.8.0.Final.jar
        ├── netty-all-4.0.29.Final.jar
        ├── protobuf-java-2.5.0.jar
        ├── slf4j-api-1.7.7.jar
        ├── slf4j-log4j12-1.7.7.jar
        ├── snappy-java-1.1.2.6.jar
        ├── spark-streaming-kafka-0-10_2.11-2.0.2.jar
        ├── unused-1.0.0.jar
        └── zookeeper-3.4.6.jar
    

    上述打包问题已经差不多了,后续会逐渐补充,具体使用后面会阐述;

    疑惑二、Spark任务提交Yarn队列之正确方式?

    这个问题是许多Spark用户都比较纠结的问题,原因在于Spark繁杂的配置项,如果对其理解不透,则在使用的时候,只能一遍遍地试用了。

    现对spark-sumbit中几个比较重要的配置,做一个说明:

    • --files : 必须用','相隔,文件会上传至executor的工作路径,默认并没有加载至classpath中,一般使用在配置文件相关;

    • --jars : 必须用','相隔,文件会上传至executor/driver(cluster模式下)的工作路径,默认会加载至classpath中,一般使用在所依赖的jar相关;

    • --class : 加载主类名;

    • --master yarn : yarn集群的方式提交

    • --queue : 提交yarn队列的名称;

    • --driver-memory : driver申请内存;

    • --executor-memory : executor申请内存;

    • --executor-cores : 每个executor中使用的cores数量,建议2~5个;

    • --conf :spark-submit启动spark任务时配置项内容,其中又包含如下几个比较重要的(示例):
      • --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" : executor启动是的jvm配置项,一般kerberos系统配置会使用到;
      • --conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab : spark-submit 依赖的keytab配置;
      • --conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM : spark-submit 启动依赖的principle配置;
      • --conf spark.driver.extraClassPath=./spark-demos-0.1.0/lib/* : driver启动时添加jvm的classpath,加载必要的jar;
    • --driver-java-options : driver 启动是的jvm配置项,一般kerberos系统配置会使用到;

    所以基于上述的配置项,如果运行KafkaToHBase项目,首先

    • 将项目依赖的配置文件加载通过--files保障executor配置项是同步的;
    • 将kerberos认证相关内容、相关配置复制到项目路径下(./kafka_client_jaas.conf,./kafka.service.keytab,./hbase-site.xml );
    • 将项目依赖的(Spark/Yarn环境没有提供的jar)通过--jars上传至executor工作路径中;

    其中注意,由于--files/--jars针对多个文件都是用','分割的,所以可以使用下面这个命令生成凭借字符串(注意变更必要参数):

    r='';for i in ls ./lib/;do r=${r},"./lib/$i";done ; echo $r

    针对https://github.com/LiShuMing/spark-demos项目,启动如下:

    • 编译完毕后,将target/spark-demos-0.1.0-distribution.tar.gz编译文件mv到工作环境,解压;
    • 将依赖的kafka_client_jaas.conf kafka.service.keytab复制到项目路径下;
    • 基于r='';for i in ls ./lib/;do r=${r},"./lib/$i";done ; echo $r生成--jars必要拼接串;

    最终运行命令如下(具体使用需要调整):

    /usr/ndp/current/spark2_client/bin/spark-submit \
    --files ./kafka_client_jaas.conf,./kafka.service.keytab,./hbase-site.xml \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
    --driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
    --conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab \
    --conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM \
    --conf spark.driver.extraClassPath=./spark-demos-0.1.0/lib/* \
    --jars ./lib/commons-cli-1.2.jar,./lib/commons-codec-1.9.jar,./lib/commons-collections-3.2.2.jar,./lib/commons-httpclient-3.1.jar,./lib/commons-io-2.4.jar,./lib/commons-lang-2.6.jar,./lib/commons-lang3-3.3.2.jar,./lib/commons-logging-1.2.jar,./lib/commons-math-2.2.jar,./lib/disruptor-3.3.0.jar,./lib/findbugs-annotations-1.3.9-1.jar,./lib/guava-12.0.1.jar,./lib/hamcrest-core-1.3.jar,./lib/hbase-annotations-1.2.6.jar,./lib/hbase-client-1.2.6.jar,./lib/hbase-common-1.2.6.jar,./lib/hbase-common-1.2.6-tests.jar,./lib/hbase-hadoop2-compat-1.2.6.jar,./lib/hbase-hadoop-compat-1.2.6.jar,./lib/hbase-prefix-tree-1.2.6.jar,./lib/hbase-procedure-1.2.6.jar,./lib/hbase-protocol-1.2.6.jar,./lib/hbase-server-1.2.6.jar,./lib/htrace-core-3.1.0-incubating.jar,./lib/jackson-core-asl-1.9.13.jar,./lib/jackson-jaxrs-1.9.13.jar,./lib/jackson-mapper-asl-1.9.13.jar,./lib/jcodings-1.0.8.jar,./lib/jdk.tools-1.8.jar,./lib/jetty-util-6.1.26.jar,./lib/jline-0.9.94.jar,./lib/joni-2.1.2.jar,./lib/junit-4.12.jar,./lib/kafka-clients-0.10.0.1.jar,./lib/log4j-1.2.17.jar,./lib/lz4-1.3.0.jar,./lib/metrics-core-2.2.0.jar,./lib/netty-3.8.0.Final.jar,./lib/netty-all-4.0.29.Final.jar,./lib/protobuf-java-2.5.0.jar,./lib/slf4j-api-1.7.7.jar,./lib/slf4j-log4j12-1.7.7.jar,./lib/snappy-java-1.1.2.6.jar,./lib/spark-demo-0.1.0.jar,./lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar,./lib/unused-1.0.0.jar,./lib/zookeeper-3.4.6.jar \
    --master yarn  \
    --class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos \
    --executor-memory 1g \
    --driver-memory 2g \
    --executor-cores 1 \
    --queue default \
    --deploy-mode client ./lib/spark-demo-0.1.0.jar
    

    相关文章

      网友评论

        本文标题:基于Yarn集群Spark Streaming 提交任务解惑

        本文链接:https://www.haomeiwen.com/subject/ctfbdftx.html