美文网首页
Spark提交任务

Spark提交任务

作者: Jorvi | 来源:发表于2019-05-07 16:18 被阅读0次

    1. 代码

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.learn.example</groupId>
        <artifactId>spark-example</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.2.0</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.2.0</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.2.0</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.1</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    public class SparkJob implements Serializable {
    
        public static void main(String[] args) {
            String appName = args[0];
    
            SparkSession session = SparkSession.builder().appName(appName).getOrCreate();
    
            try {
                String fullClassName = "org.learn.example.jobs." + appName;
                Class clazz = Class.forName(fullClassName);
                Method method = clazz.getDeclaredMethod("run", SparkSession.class);
                method.invoke(clazz.newInstance(), session);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    
    
    public interface ISparkJob extends Serializable {
        final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
    
        void run(SparkSession session);
    }
    
    
    
    public class WordCount implements ISparkJob {
    
        @Override
        public void run(SparkSession session) {
            JavaRDD<String> javaRDD = session.createDataset(Arrays.asList("aaa bbb", "bbb ccc", "aaa"), Encoders.STRING()).javaRDD();
    
            javaRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    Iterator<String> iterator = Arrays.asList(line.split(" ")).iterator();
                    return iterator;
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<>(word, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }).saveAsTextFile("/home/mr/example/result/" + DATE_FORMAT.format(new Date()));
        }
    
    }
    

    2. client模式

    submit-client.sh

    #!/bin/bash
    
    /home/bin/spark-submit --master spark://bd01:7077,bd02:7077 --executor-memory 1G --total-executor-cores 2 --class org.learn.example.SparkJob ../spark-example-1.0-SNAPSHOT.jar ${1}
    
    

    解释

    • 不指定模式默认是client模式
    • bd01、bd02需要在/etc/hosts中配置:
    11.11.11.11 bd01
    22.22.22.22 bd02
    127.0.0.1 localhost
    
    • 需要使用本地jar包。
      经测试发现:client模式无法使用HDFS上的jar包(version 2.0.2),会报错:
    Warning: Skip remote jar hdfs:/home/mr/example/spark-example-1.0-SNAPSHOT.jar.
    java.lang.ClassNotFoundException: org.learn.example.SparkJob
            at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
            at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
            at java.lang.Class.forName0(Native Method)
            at java.lang.Class.forName(Class.java:270)
            at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:693)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    
    • 打jar包时需要排除Spark和Hadoop,即在Maven中指定为provided

    When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime.


    3. cluster模式

    submit-cluster.sh

    #!/bin/bash
    
    /home/bin/spark-submit --master spark://bd01:7077,bd02:7077 --deploy-mode cluster --executor-memory 1G --total-executor-cores 2 --class org.learn.example.SparkJob hdfs:/home/mr/example/spark-example-1.0-SNAPSHOT.jar ${1}
    
    

    解释:

    • 在cluster模式下,jar包需要放到集群可以访问的目录,如果是放在本地,需要在每个节点上都放置一份

    The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.

    注意:如果使用本地jar包,但不是所有节点都有此jar包,则可能会出错:

    ERROR org.apache.spark.deploy.ClientEndpoint: Exception from cluster was: java.nio.file.NoSuchFileException: /home/mr/example/spark-example-1.0-SNAPSHOT.jar
    java.nio.file.NoSuchFileException: /home/mr/example/spark-example-1.0-SNAPSHOT.jar
            at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
            at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
            at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
            at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:520)
            at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
            at java.nio.file.Files.copy(Files.java:1225)
            at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:607)
            at org.apache.spark.util.Utils$.copyFile(Utils.scala:578)
            at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:663)
            at org.apache.spark.util.Utils$.fetchFile(Utils.scala:462)
            at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:154)
            at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:83)
    

    现象
    假设现有bd01、bd02、bd03三个节点,这三个节点上都有worker进程。

    bd01、bd02上都存放了jar包,bd03上没有此jar包。

    cluster模式提交的时候,如果driver进程启在bd01、bd02上,则程序正确执行;如果driver进程启在bd03上则会报上述错误。

    原因
    cluster模式下,driver进程由worker进程启动,

    当启动driver进程时需要从jar包中获取代码启动SparkContext,

    如果spark-submit脚本中配置的jar包路径是HDFS,则worker进程会去HDFS上下载此jar包到本地目录(SPARK_WORKER_DIR配置的Worker日志目录)的各driver目录下,然后利用driver目录下的jar包启动driver进程;

    如果spark-submit脚本中配置的jar包路径是本地目录,则worker进程会去此目录找jar包,并将此jar包复制到本地目录(SPARK_WORKER_DIR配置的Worker日志目录)的各driver目录下,然后利用driver目录下的jar包启动driver进程。
    当spark-submit脚本中配置的jar包路径中没有jar包时,则会报上述NoSuchFileException的错误。

    启动日志中有相应的描述:

    "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@bd01:35201" "/data/work/driver-20190413101025-2532/spark-example-1.0-SNAPSHOT.jar" "org.learn.example.SparkJob" "WordCount"
    

    查看bd01的/data/work/driver-20190413101025-2532目录:

    相关文章

      网友评论

          本文标题:Spark提交任务

          本文链接:https://www.haomeiwen.com/subject/kdxhwqtx.html