美文网首页spark||flink||scala
本地测试Spark任务

本地测试Spark任务

作者: halfempty | 来源:发表于2018-10-16 15:02 被阅读381次

    1. 背景

    在Linux下安装Ambari或者CDH并不复杂,但考虑到环境的维护、组件(尤其是Spark)版本的变更,以及测试数据的污染等因素,希望有一种解决方案能减弱这些困扰。

    之所以选择本地执行:

    • 环境独享,不被他人干扰
    • 使用Jmockit,实现局部自定义改造
    • 结合Testng,方便单元测试用例编写与执行
    • 甚至可以通过Intellij IDEA实现代码调试

    2. 环境搭建

    2.1 POM文件

    在Intellij IDEA创建新的Maven Project,并配置pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.jmockit</groupId>
            <artifactId>jmockit</artifactId>
            <version>1.40</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>6.14.3</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

    这里使用了Spark 2.1.0版本,如果开发组件版本变更,将Maven源调整成对应版本即可。

    2.2 调试

    先来创建一下Sparksession

    SparkSession session = SparkSession.builder()
                    .appName("my local spark")
                    .master("local[*]")
                    .enableHiveSupport()
                    .getOrCreate();
    

    运行发现报错

    18/10/15 15:43:30 ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:378)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:393)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:386)
    at org.apache.hadoop.hive.conf.HiveConfConfVars.findHadoopBinary(HiveConf.java:2327)* ​ *at org.apache.hadoop.hive.conf.HiveConfConfVars.<clinit>(HiveConf.java:365)
    at org.apache.hadoop.hive.conf.HiveConf.<clinit>(HiveConf.java:105)

    根据错误日志的堆栈输出,将问题锁定在Shell类的getWinUtilsPath方法

    public static final String getWinUtilsPath() {
            String winUtilsPath = null;
    
            try {
                if (WINDOWS) {
                    winUtilsPath = getQualifiedBinPath("winutils.exe");
                }
            } catch (IOException var2) {
                LOG.error("Failed to locate the winutils binary in the hadoop binary path", var2);
            }
    
            return winUtilsPath;
        }
    

    可以看到,在windows环境下,需要借助额外的winutils.exe工具(用来模拟hdfs文件操作,下载地下:https://github.com/srccodes/hadoop-common-2.2.0-bin

    除此外,还需要配置hadoop.home.dir属性,指向winutils.exe工具所在目录

    System.setProperty("hadoop.home.dir", "D:\\package\\hadoop");
    

    2.3 HIVE配置

    三个相关配置项:

    • spark.sql.warehouse.dir
      • 位于SQLConf.scala,覆盖HiveConf.class下的hive.metastore.warehouse.dir,默认值为当前路径下的“spark-warehouse”
    • hive.metastore.uris
      • 如果不填,默认在使用derby在本地创建元库(可以使用jdk自带的ij工具进行连接,但只支持单会话)
      • 如果想连接远程元库,可以配置“thrift://ip:9083
    • spark.sql.shuffle.partitions
      • 官方解释:The default number of partitions to use when shuffling data for joins or aggregations,它只针对spark sql的连接和聚合操作,默认值为200。本地测试的话,可以配置为1,减少文件数,从而提高处理速度。
      • 注意与“spark.default.parallelism”的区别(Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.)

    3. SQL文件初始化

    当使用Hive时,少不了表的初始化。如果在Linux上,直接执行hive -f xxx.sql即可实现批量创建。

    然后上述方案并未提供hive指令,唯一知道可以执行sql语句的指令为SparkSession.sql方法,一次只能执行一条,且无法识别最后的;

    没办法,既然没有现成的,就只好动手自己造呢。目的很明确,就是将sql文件解析成sql集合,再遍历执行即可

    public class SqlReader {
    
        public static List<String> readSql(String sqlFile) {
            List<String> sqls = new ArrayList<String>();
            
            try {
                BufferedReader bufferedReader = new BufferedReader(
                        new InputStreamReader(
                                SqlReader.class.getClassLoader().getResourceAsStream(sqlFile), "utf-8"));
    
                StringBuilder sb = new StringBuilder();
                String line = null;
                boolean endFlag = false;
                while((line = bufferedReader.readLine()) != null) {
                    String tmp = line.trim();
                    if(tmp == "" || tmp.startsWith("--")) {
                        continue;
                    } else {
                        if (tmp.endsWith(";")) {
                            sql = tmp.substring(0, tmp.length() - 1);
                            endFlag = true;
                        }
                        sb.append(sql).append(" ");
                    }
    
                    if (endFlag) {
                        sqls.add(sb.toString());
                        sb.setLength(0);
                        endFlag = false;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return sqls;
        }
    }
    

    4. 构造测试数据

    二种方案供参考:

    1. 使用load data local inpath '/path/to/data' overwrite into table xxx [partition(xx='xx')]
    2. 当存在虚表dual(oracle叫法)时,可以使用insert overwrite table xxx select filed1, filed2, filed3, 1, 'a' from dual

    小技巧:

    • 直接调整“spark.sql.warehouse.dir”下hive表的文件内容,包括修改,复制等,相当于实现update操作(虽然hive不支持update),所以对hive表数据的操作转化为本地文件的操作。
    • 如果Hive表带分区,通过上一条复制分区的操作将不被识别,因为复制的分区信息并没有写入到Hive的元库中(可以通过ij连接本地derby,查看PARTITIONS表)。如果非要这么操作也不是不可,执行msck repaire table xxx可以修复hive表的分区信息
    • 然后有一个问题,即HDFS的CRC校验,当我们修改数据文件时,校验码将不匹配,需要删除对应文件的crc文件;或者选择关闭HDFS的CRC校验,fileSystem.setVerifyChecksum(false)

    5. 结果校验

    这里不细展开,可以将结果转化成Array或者String,也可以通过Hash算法计算结果再比较

    需要注意的是,返回结果的顺序可能错乱,无法与预期结果依次比对

    相关文章

      网友评论

        本文标题:本地测试Spark任务

        本文链接:https://www.haomeiwen.com/subject/xdefzftx.html