美文网首页Spark 应用玩转大数据Spark案例
SparkSQL操作RDD两种方式对比案例

SparkSQL操作RDD两种方式对比案例

作者: MichaelFly | 来源:发表于2016-12-03 20:34 被阅读1412次

    前言

    GitHub地址:https://github.com/guofei1219

    背景

    统计新渠道进件数量

    SparkSQL操作RDD两种方式对比

    1.使用反射推断Schema类型,具体解析参考下面的官网描述

    The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

    case class blb_intpc_info(chnl_code:String,id_num:String)
    

    2.使用编程方式制定Schema类型,具体解析参考下面的官网描述

    When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

    >1.Create an RDD of Rows from the original RDD;
    2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
    2.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
    
    val structTypes = StructType(Array(
      StructField("chnl_code", StringType, true),
      StructField("id_num", StringType, true)
    ))
    
    对比总结

    1.case class模板类模式可视化比较好
    2.case class模板类参数上限为22个,对于字段多的不能使用
    3.编程方式更适合日常开发

    代码实现

    源数据格式

    ,第一列为渠道代码、第二列为进件ID

    306DC4246 411324199209142831
    306DC423A 360124199011241838
    306DC423D 440802198010290019
    306DC4226 612328197403120016
    306DC4201 452629199104050312
    306DC4201 350212198505025514

    反射方式
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 统计不同渠道进件数量
      * Created by Michael on 2016/11/29.
      */
    object Custmer_Statistics_CaseClass {
    
      /**
        * 使用模板类描述表元数据信息
        * @param chnl_code
        * @param id_num
        */
      case class blb_intpc_info(chnl_code:String,id_num:String)
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Custmer_Statistics_CaseClass").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //RDD隐式转换成DataFrame
        import sqlContext.implicits._
        //读取本地文件
        val blb_intpc_infoDF = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
          .map(_.split("\\t"))
          .map(d => blb_intpc_info(d(0), d(1))).toDF()
    
        //注册表
        blb_intpc_infoDF.registerTempTable("blb_intpc_info")
    
        /**
          * 分渠道进件数量统计并按进件数量降序排列
          */
        blb_intpc_infoDF.registerTempTable("blb_intpc_info")
        sqlContext.sql("" +
          "select chnl_code,count(*) as intpc_sum " +
          "from blb_intpc_info " +
          "group by chnl_code").toDF().sort($"intpc_sum".desc).show()
      }
    
    }
    
    运行结果
    +---------+---------+
    |chnl_code|intpc_sum|
    +---------+---------+
    |306DC421E|      631|
    |306DC4201|      603|
    |306DC422B|      472|
    |306DC4221|      326|
    |306DC425E|      280|
    |306DC4237|      277|
    |306DC4210|      238|
    |306DC4246|      236|
    |306DC4229|      223|
    |306DC4257|      202|
    |306DC420E|      197|
    |306DC4215|      183|
    |306DC421F|      176|
    |306DC425A|      156|
    |306DC4251|      140|
    |306DC4202|      131|
    |306DC424D|      125|
    |306DC4226|      122|
    |306DC422A|      112|
    |306DC422D|      108|
    
    编程方式

    查询Hive元数据库获取Hive 指定表字段信息。
    注:对Hive元数据表结构不了解的同学用google搜几篇帖子看看或者参看本文末尾参考文章
    不解释了,直接上代码

    public static String getHiveMetaData(String hiveTableName) {
        Connection conn = getConn();
        String sql = "SELECT\n" +
                "  #TBLS.`TBL_NAME`,\n" +
                "  #表名\n" +
                "  COLUMNS_V2.`COLUMN_NAME`\n" +
                "  #列名\n" +
                "  #COLUMNS_V2.`TYPE_NAME` #列类型\n" +
                "FROM\n" +
                "  TBLS #元数据信息表\n" +
                "  LEFT JOIN SDS #数据存储表\n" +
                "    ON TBLS.SD_ID = SDS.SD_ID\n" +
                "  LEFT JOIN CDS\n" +
                "    ON SDS.CD_ID = CDS.CD_ID\n" +
                "  LEFT JOIN COLUMNS_V2 #字段信息表\n" +
                "    ON CDS.CD_ID = COLUMNS_V2.CD_ID\n" +
                "WHERE TBLS.`TBL_NAME` = \"gd_py_corp_sharehd_info\"";
        PreparedStatement pstmt;
        String result="";
        try {
            pstmt = (PreparedStatement)conn.prepareStatement(sql);
            ResultSet rs = pstmt.executeQuery();
            int col = rs.getMetaData().getColumnCount();
    
            while (rs.next()) {
                for (int i = 1; i <= col; i++) {
                    result = result + rs.getString(i) + "\t";
                }
            }
    
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return result;
    }
    
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    import utils.DataUtils
    
    /**
      * 统计不同渠道进件数量
      * Created by Michael on 2016/11/29.
      */
    object Custmer_Statistics_StructType {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Custmer_Statistics_StructType").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //RDD隐式转换成DataFrame
        import sqlContext.implicits._
        //读取本地文件
        val blb_intpc_infoRow = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
          .map(_.split("\\t"))
          .map(d => {
            Row(d(0),d(1))
          })
    
        //Hive表字段元数据信息
        val schemaString = DataUtils.getHiveMetaData("blb_intpc_info")
        val schema =StructType(schemaString.split("\\t")
          .map(fieldName => StructField(fieldName, StringType, true)))
    
        val blb_intpc_infoDF = sqlContext.createDataFrame(blb_intpc_infoRow,schema)
        //注册表
        blb_intpc_infoDF.registerTempTable("blb_intpc_info")
    
        /**
          * 分渠道进件数量统计并按进件数量降序排列
          */
        blb_intpc_infoDF.registerTempTable("blb_intpc_info")
        sqlContext.sql("" +
          "select chnl_code,count(*) as intpc_sum " +
          "from blb_intpc_info " +
          "group by chnl_code").toDF().sort($"intpc_sum".desc).show()
      }
    
    }
    
    参考文章

    1.hive元数据:http://blog.csdn.net/wf1982/article/details/6644258
    2.SparkSQL官网:http://spark.apache.org/docs/1.6.0/sql-programming-guide.html

    相关文章

      网友评论

        本文标题:SparkSQL操作RDD两种方式对比案例

        本文链接:https://www.haomeiwen.com/subject/xuucmttx.html