美文网首页
DataFrame与RDD区别

DataFrame与RDD区别

作者: 林桉 | 来源:发表于2019-12-16 16:47 被阅读0次
    image.png

    DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

    • 压缩
      而对于DataFrame来说,它不仅可以知道里面的数据,而且它还可以知道里面的schema信息。
      因此能做的优化肯定也是更多的,举个例子:
      因为每一列的数据类型是一样的,因此可以采用更好的压缩,这样的话整个DF存储所占用的东西必然是比RDD要少很多的。
    • 执行效率
      RDD API是函数式的,强调不变性,在大部分场景下倾向于创建新对象而不是修改老对象。这一特点虽然带来了干净整洁的API,却也使得Spark应用程序在运行期倾向于创建大量临时对象,对GC造成压力。
    • 剪枝
      当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,该数据段就可以直接跳过(例如某整数列a某段的最大值为100,而查询条件要求a > 200)。
    • 逻辑执行计划


      image.png

    Java/Scala 操作RDD的底层是跑在JVM上的
    Python 操作RDD的底层不跑在JVM上,它有Python Execution
    因此使用RDD编程带来一个很大的问题:
    由于使用不同语言操作RDD,底层所运行的环境不同(使用Java/Scala 与 Python 所运行的效率完全是不一样的,Python是会慢一些的)
    DataFrame逻辑层隔离
    DF不是直接到运行环境的,中间还有一层是logicplan,统统先转换成逻辑执行计划之后,再去进行运行的;所以现在DF不管采用什么语言,它的执行效率都是一样的

    • 环境配置
      Spark Core不再依赖,只需添加spark-sql
    import java.io.FileInputStream
    import java.util.Properties
    
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @author Administrator
      *         2018/10/16-14:35
      *
      */
    object TestSaveFile {
      var hdfsPath: String = ""
      var proPath: String = ""
      var DATE: String = ""
    
      val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
      val sc: SparkContext = new SparkContext(sparkConf)
      val sqlContext: SQLContext = new HiveContext(sc)
    
      def main(args: Array[String]): Unit = {
        hdfsPath = args(0)
        proPath = args(1)
        //不过滤读取
        val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
        saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
      }
    
      /**
        * 获取 Mysql 表的数据
        *
        * @param sqlContext
        * @param tableName 读取Mysql表的名字
        * @param proPath   配置文件的路径
        * @return 返回 Mysql 表的 DataFrame
        */
      def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String): DataFrame = {
        val properties: Properties = getProPerties(proPath)
        sqlContext
          .read
          .format("jdbc")
          .option("url", properties.getProperty("mysql.url"))
          .option("driver", properties.getProperty("mysql.driver"))
          .option("user", properties.getProperty("mysql.username"))
          .option("password", properties.getProperty("mysql.password"))
          .option("dbtable", tableName)
          .load()
      }
    
      /**
        * 将 DataFrame 保存为 hdfs 文件 同时指定保存绝对路径 与 分隔符
        *
        * @param dataFrame  需要保存的 DataFrame
        * @param absSaveDir 保存保存的路径 (据对路径)
        * @param splitRex   指定分割分隔符
        * @param saveMode   保存的模式:Append、Overwrite、ErrorIfExists、Ignore
        */
      def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
        dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.output.compress", "false")
        //为了方便观看结果去掉压缩格式
        val allClumnName: String = dataFrame.columns.mkString(",")
        val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
        result.write.mode(saveMode).text(absSaveDir)
      }
    
      /**
        * 获取配置文件
        *
        * @param proPath
        * @return
        */
      def getProPerties(proPath: String): Properties = {
        val properties: Properties = new Properties()
        properties.load(new FileInputStream(proPath))
        properties
      }
    }
    

    小白学习 无关利益

    相关文章

      网友评论

          本文标题:DataFrame与RDD区别

          本文链接:https://www.haomeiwen.com/subject/rdednctx.html