美文网首页
spark_SQL 学习

spark_SQL 学习

作者: BitGuo | 来源:发表于2019-11-13 19:04 被阅读0次

    历经版本迭代更新,spark sql中原本带有模式信息的RDD即SchemaRDD,在spark1.3之后变成了新的数据结构 DataFrame
    RDD是风不是的java对象的集合,RDD无法知道RDD内部存储的数据结构的详细模式信息。
    DataFrame是以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供详细的结构信息,也就是我们常说的数据库模式信息。Spark SQL可以清除的知道该数据集中包含哪些列,每列的名称和类型。

    从RDD转换得到DataFrame

    1.利用反射机制,即toDF()方法

    正常的创建了一个RDD对象后,对其调用toDF()方法

    peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
    #接着必须将其注册为临时表才可以被查询
    peopleDF.createOrReplaceTempView("people") 
    personsDF = spark.sql("select * from people")
    personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
    
    2.利用编程方式
    >>>  from pyspark.sql.types import Row
    >>>  from pyspark.sql.types import StructType
    >>> from pyspark.sql.types import StructField
    >>> from pyspark.sql.types import StringType
     
    //生成 RDD
    >>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
     
    //定义一个模式字符串
    >>> schemaString = "name age"
     
    //根据模式字符串生成模式
    >>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
    >>> schema = StructType(fields)
    //从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
     
     
    >>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
     
    >>> peopleDF = spark.createDataFrame(rowRDD, schema)
     
    //必须注册为临时表才能供下面查询使用
    scala> peopleDF.createOrReplaceTempView("people")
     
    >>> results = spark.sql("SELECT * FROM people")
    >>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
     
    name: Michael,age: 29
    name: Andy,age: 30
    name: Justin,age: 19
    

    相关文章

      网友评论

          本文标题:spark_SQL 学习

          本文链接:https://www.haomeiwen.com/subject/qaskictx.html