美文网首页
Spark on Hive 和 Hive on Spark 区别

Spark on Hive 和 Hive on Spark 区别

作者: alexlee666 | 来源:发表于2019-10-15 18:56 被阅读0次

    一、背景

    1.1 为什么引入Hive?

    最初提出Hive的主要目的在于:降低使用MapReduce完成查询任务的技术门槛
    在RDBMS中,开发人员或者用户通过执行SQL语句进行查询,SQL语言是开发人员大都熟悉的语言。在大数据发展的初期,大数据查询的计算任务需要通过MapReduce来完成,然而编写MapReduce的程序是件复杂繁琐的事情。Hive 可以实现将大家熟悉的SQL语句翻译成复杂的MapReduce程序,利用Hive非MapReduce开发人员也能够快速上手使用MapReduce完成查询任务。因此,大家经常会说Hive使用的是一种类SQL的HQL语言。

    Hive查询原理示意图

    1.2 为什么引入Spark?

    Hive底层计算使用的是Hadoop的MapReduce,由于需要繁的磁盘IO,其计算性能只适合于大文件的非实时的批处理操作。Spark基于内存计算,凭借着DAG和RDD特性(保证中间数据如果丢失可以重新计算恢复),可以将计算的中间结果以RDD的形式保存在内存中,而不需要频繁的磁盘IO,非常适合于交互式迭代计算。Spark的计算性能远高于Hadoop的MapReduce。

    1.3 Hive的内部表、外部表以及元数据

    • 未被external修饰的是内部表(managed table);
    • 被external修饰的为外部表(external table);

    Hive表的元数据:

    • Hive表的元数据metadata包括:表名、表的类型(内部表还是外部表)、表的owner、字段类型、数据存储位置等信息。
    • Hive的元数据都存储在metastore中,metastore的数据使用JPOX(Java Persistent Objects)对象关系映射解决方案进行持久化,所以任何被JPOX支持的存储都可以被Hive使用,包括大多数商业RDBMS和许多开源的数据存储。Hive支持三种不同的元存储服务器,分别为:内嵌式元存储(即Derby)、本地元存储(最常见的MYSQL)、远程元存储,每种存储方式使用不同的配置参数。关于Hive的metastore可以参考这篇博客:https://blog.csdn.net/skywalker_only/article/details/26219619
    • 在安装Hive时需要对Hive的metastore进行配置,关于使用MYSQL作为Hive的metastore的方法可以参考博客:https://www.jianshu.com/p/ce4c5826a078

    内部表 V.S. 外部表:

    名称 内部表 外部表
    表数据由谁管理 Hive自身管理 HDFS管理
    表数据存储位置 配置项hive.metastore.warehouse.dir(默认:/user/hive/warehouse) 自己制定
    删除表带来的影响 直接删除元数据(metadata)和存储的数据 仅删除元数据(metadata)
    修改表结构带来的影响 将修改同步给元数据(metadata) 需要修复外部表 (MSCK REPAIR TABLE table_name;)

    了解了这些背景知识后,接下来比较下Spark on Hive 和 Hive on Spark 区别。


    二、Spark on Hive 和 Hive on Spark 区别

    2.1 Spark on Hive

    顾名思义,即将Spark构建在Hive之上,Spark需要用到Hive,具体表现为:

    • 就是通过Spark SQL,加载Hive的配置文件,获取到Hive的metastore信息,进而获得metadata,但底层运行的还是 Spark RDD;
    • Spark SQL获取到metadata之后就可以去访问Hive的所有表的数据;
    • 接下来就可以通过Spark SQL来操作Hive表中存储的数据。

    总之,Spark使用Hive来提供表的metadata信息

    2.2 Hive on Spark

    顾名思义,即将Hive构建在Spark之上(Hive的底层默认计算引擎为Hadoop的MapReduce),Hive需要用到Spark,具体表现为:

    • Hive 的底层默认计算引擎从MapReduce改为Spark;
    • 通过修改hive-site.xml配置项hive.execution.engine的值来修改执行引擎(默认为mapreduce,即mr):
    <property>
        <name>hive.execution.engine</name>
        <value>spark</value>
        <description>
          Expects one of [mr, tez, spark].
          Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR
          remains the default engine for historical reasons, it is itself a historical engine
          and is deprecated in Hive 2 line. It may be removed without further warning.
        </description>
      </property>
    
    
    hive> insert into tbl1 values(2,'a', 'f', 2);
    
    Query ID = ......
    Total jobs = 1
    Launching Job 1 out of 1
    In order to change the average load for a reducer (in bytes):
      set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
      set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
      set mapreduce.job.reduces=<number>
    java.lang.NoClassDefFoundError: io/netty/channel/EventLoopGroup
        at org.apache.hive.spark.client.SparkClientFactory.initialize(SparkClientFactory.java:56)
        at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.setup(SparkSessionManagerImpl.java:83)
        at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.getSession(SparkSessionManagerImpl.java:102)
        at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.getSparkSession(SparkUtilities.java:125)
        .....
    Caused by: java.lang.ClassNotFoundException: io.netty.channel.EventLoopGroup
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 24 more
    FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. io/netty/channel/EventLoopGroup
    
    

    开发过程中常采取Spark on Hive 方案,接下来给出 Spark on Hive 创建外部查询表的方法。


    三、 Spark on Hive 创建外部查询表

    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
    
    
    public class HiveUtil {
    
      private static final Logger logger = LoggerFactory.getLogger(HiveUtil.class);
    
      public void createSparkDatabase(String instanceName, String schemaName, String tableName)
              throws Exception {
        String databaseName = getDatabaseName(instanceName, schemaName);
        String ddl = String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName+"___s");
        launchSparkTask(instanceName, schemaName, tableName, ddl);
      }
    
      public void createSparkTable(String instanceName, String schemaName,
                                  String tableName, String tablePrefix) throws Exception {
        String sourcePK = metaConfigDao.getSoucePk(instanceName, schemaName, tableName);
        dropSparkTable(instanceName, schemaName, tableName);
        List<Pair<String, String>> columnList = dbHelper.getColumnList(instanceName,
                schemaName, tableName);
        StringBuffer dbColumns = new StringBuffer();
        for (int i=0; i<columnList.size(); i++) {
          Pair<String, String> p = columnList.get(i);
          if (i == columnList.size() -1 ) {
            dbColumns.append(p.getLeft()+" "+p.getRight());
          } else {
            dbColumns.append(p.getLeft()+" "+p.getRight()+",");
          }
        }
    
        String databaseName = ;
        String tenantName = ;
    
        String ddl = String.format("CREATE TABLE if not exists %s.%s (%s) "
                        + " using org.apache.spark.sql.execution.datasources.parquet"
                        + " options(\"tenant\" \'%s\',"
                        + "\"instance\" \"%s\", "
                        + "\"schema\" \"%s\", "
                        + "\"table\" \"%s\", "
                        + "\"prefix\" \"%s\", "
                        + "\"sourcepk\" \"%s\") ",
                databaseName+"___s", tableName, dbColumns, tenantName, instanceName,
                schemaName, tableName,tablePrefix, sourcePK);
    
        logger.info("Execute spark ddl, ddl is: [{}]", ddl);
        launchSparkTask(instanceName, schemaName, tableName, ddl);
        logger.info("Spark table created [{}.{}]", schemaName, tableName);
      }
    
      public void dropSparkTable(String instanceName, String schemaName, String tableName)
              throws Exception {
        String databaseName = getDatabaseName(instanceName, schemaName);
        String ddl = String.format("DROP TABLE IF EXISTS %s.%s", databaseName+"___s", tableName);
        launchSparkTask(instanceName, schemaName, tableName, ddl);
        logger.info("Dropped spark table [{}.{}]", databaseName+"___s", tableName);
      }
    
    
    //launch spark task to execute spark jobs
      public void launchSparkTask(String instanceName, String schemaName,
                                  String tableName, String sqlddl) throws Exception{
        String sparkTaskJarName = handler.getParquetJarName().trim();
        String createSparkTableClassPath = handler.getCreateSparkTableClassPath().trim();
        String sparkMaster = handler.getSparkMaster().trim();
        System.out.println("sparkTaskJarName: " + sparkTaskJarName);
        String keytabPath = (kerberosUtil.getKeytabPath() == null) ?
                "" : kerberosUtil.getKeytabPath();
        String principal = (kerberosUtil.getPrincipal() == null) ?
                "" : kerberosUtil.getPrincipal();
        SparkLauncher launcher = new SparkLauncher();
        // To be launched spark-application jar
        launcher.setAppResource(sparkTaskJarName);
        launcher.setMainClass(createSparkTableClassPath);
        launcher.addAppArgs(sqlddl, keytabPath, principal );
        // master could be yarn or local[*]
        //launcher.setMaster("local[*]");
        launcher.setMaster(sparkMaster);
        SparkAppHandle handle = launcher.startApplication();
        int retrytimes = 0;
        while (handle.getState() != SparkAppHandle.State.FINISHED) {
          retrytimes ++;
          Thread.sleep(5000L);
          System.out.println("applicationId is: " + handle.getAppId());
          System.out.println("current state: " + handle.getState());
          boolean mark = (handle.getAppId() == null
                  && handle.getState() == SparkAppHandle.State.FAILED )
                  && retrytimes > 8;
          if (mark) {
            logger.info("can not start spark job for creating spark table. Creating spark table failed. ");
            metaConfigDao.updateMetaFlag(instanceName, schemaName, tableName, Config.META_FLAG_TASK_FAILED);
            failedflag = true;
            break;
          }
        }
        System.out.println("Launcher over");
      }
    }
    
    

    相关文章

      网友评论

          本文标题:Spark on Hive 和 Hive on Spark 区别

          本文链接:https://www.haomeiwen.com/subject/uepsmctx.html