美文网首页
SparkSQl的使用

SparkSQl的使用

作者: 不愿透露姓名的李某某 | 来源:发表于2019-07-13 17:17 被阅读0次

    SparkSql现在有两个版本,方式如下:

    方式一:使用sql版本

    //提交的这个程序可以连接到spark集群中

        val conf =new SparkConf().setAppName("SaparkDemo1").setMaster("local[*]")

    //创建sparksql的连接(程序执行的入口)

        val sc=new SparkContext(conf)

    //sparkContext不能创建特殊的RDD

    //将sparkContext包装增强

        val sqlcontext =new SQLContext(sc)

    //创建特殊的RDD(DataFrame),就是有schema的RDD

    //先有一个普通的RDD,然后在关联schema,进而转成DataFrame

    //在集群如有如下数据 1,laoduan,35,29

        val lines =sc.textFile("hdfs://pro01:9000/person")

    val boyRDd:RDD[Boy] = lines.map(line => {

    val fields = line.split(",")

    val id = fields(0).toLong

    val name = fields(1)

    val age = fields(2).toInt

    val fv = fields(3).toDouble

    Boy(id,name,age,fv)

    })

    //该RDD装的是Boy类型的数据,有个shcma信息,还是一个RDD

    //将RDD转换成DataFrame

    //导入隐式转换

        import  sqlcontext.implicits._

    val bdf = boyRDd.toDF

    //变成DF后可以使用两种API进行编程

    //把DataFrame注册临时表

        bdf.registerTempTable("t_boy")

    //书写sql

        val result=sqlcontext.sql("SELECT * FROM t_boy ORDER BY fv desc ,age asc")

    //查看结果(触发Action)

        result.show()

    sc.stop()

    }

    }

    case  class  Boy (id:Long,name:String,age:Int,fv:Double)

    方式一的扩展:

    //提交的这个程序可以连接到spark集群中

      val conf =new SparkConf().setAppName("SaparkDemo1").setMaster("local[*]")

    //创建sparksql的连接(程序执行的入口)

      val sc=new SparkContext(conf)

    //sparkContext不能创建特殊的RDD

    //将sparkContext包装增强

      val sqlcontext =new SQLContext(sc)

    //创建特殊的RDD(DataFrame),就是有schema的RDD

    //先有一个普通的RDD,然后在关联schema,进而转成DataFrame

    //在集群如有如下数据 1,laoduan,35,29

      val lines =sc.textFile("hdfs://pro01:9000/person")

    val rowRDD:RDD[Row] = lines.map(line => {

    val fields = line.split(",")

    val id = fields(0).toLong

    val name = fields(1)

    val age = fields(2).toInt

    val fv = fields(3).toDouble

    Row(id,name,age ,fv)

    })

    val sch =StructType(List(

    StructField("id", LongType,true),

    StructField("name", StringType,true),

    StructField("age", IntegerType,true),

    StructField("fv", DoubleType,true)

    ))

    //结果类型,表头,用于描述DataFram

      val bdf = sqlcontext.createDataFrame(rowRDD,sch)

    //该RDD装的是Boy类型的数据,有个shcma信息,还是一个RDD

    //将RDD转换成DataFrame

    //变成DF后可以使用两种API进行编程

    //把DataFrame注册临时表

      bdf.registerTempTable("t_boy")

    //书写sql

      val result=sqlcontext.sql("SELECT * FROM t_boy ORDER BY fv desc ,age asc")

    //查看结果(触发Action)

      result.show()

    sc.stop()

    另一种写法:将写sql的方式使用方法来调用

    //提交的这个程序可以连接到spark集群中

      val conf =new SparkConf().setAppName("SaparkDemo1").setMaster("local[*]")

    //创建sparksql的连接(程序执行的入口)

      val sc=new SparkContext(conf)

    //sparkContext不能创建特殊的RDD

    //将sparkContext包装增强

      val sqlcontext =new SQLContext(sc)

    //创建特殊的RDD(DataFrame),就是有schema的RDD

    //先有一个普通的RDD,然后在关联schema,进而转成DataFrame

    //在集群如有如下数据 1,laoduan,35,29

      val lines =sc.textFile("hdfs://pro01:9000/person")

    val rowRDD:RDD[Row] = lines.map(line => {

    val fields = line.split(",")

    val id = fields(0).toLong

    val name = fields(1)

    val age = fields(2).toInt

    val fv = fields(3).toDouble

    Row(id,name,age,fv)

    })

    val sch =StructType(List(

    StructField("id", LongType,true),

    StructField("name", StringType,true),

    StructField("age", IntegerType,true),

    StructField("fv", DoubleType,true)

    ))

    //结果类型,表头,用于描述DataFram

      val bdf = sqlcontext.createDataFrame(rowRDD,sch)

    //不使用SQl的方式不用注册临时表

      import  sqlcontext.implicits._

    val df1 = bdf.select("name","age","fv")

    val df2 = df1.orderBy($"fv" desc,$"age" asc  )

    sc.stop()

    方式二:

    //Spark2.x

    val session = SparkSession.builder()

    .appName("SqlText1")

    .master("local[*]")

    .getOrCreate()

    //创建RDD

    val lines = session.sparkContext.textFile("hdfs://pro01:9000/person")

    val rowRDD: RDD[Row] = lines.map(line => {

    val fields = line.split(",")

    val id = fields(0).toLong

    val name = fields(1)

    val age = fields(2).toInt

    val fv = fields(3).toDouble

    Row(id, name,age, fv)

    })

    val sch =StructType(List(

    StructField("id", LongType,true),

    StructField("name", StringType,true),

    StructField("age", IntegerType,true),

    StructField("fv", DoubleType,true)

    ))

    //创建DataFrame

    val df = session.createDataFrame(rowRDD, sch)

    import session.implicits._

    val df2 = df.where($"fv" >98).orderBy($"fv" desc,$"age" asc)

    session.stop()

    另一种写法:创建视图

    //创建SparkSession

      val sparke = SparkSession.builder()

    .appName("SqlWordcount")

    .master("local[*]")

    .getOrCreate()

    //指定读数据

    //Dataset分布式数据集,是对RDD的进一步分装,更加智能

    //Dateset默认只有一列,是value

      val lines: Dataset[String] = sparke.read.textFile("hdfs://pro01:9000/person")

    //整理数据

      import  sparke.implicits._

    val words: Dataset[String] = lines.flatMap(_.split(" "))

    //注册试图

      words.createGlobalTempView("v_wc")

    val result:DataFrame =sparke.sql("SELECT value,COUNT(*) counts FROM v_wc GROUP BY value ORDER BY counts DESC")

    result.show()

    另一种写法:将sql使用方法来调用

    //创建SparkSession

    val sparke = SparkSession.builder()

    .appName("DataSerWordcount")

    .master("local[*]")

    .getOrCreate()

    //指定读数据

    //Dataset分布式数据集,是对RDD的进一步分装,更加智能

    //Dateset默认只有一列,是value

    val lines: Dataset[String] = sparke.read.textFile("hdfs://pro01:9000/person")

    //整理数据

    import sparke.implicits._

    val words: Dataset[String] = lines.flatMap(_.split(" "))

    //使用DataSet的API(DSL)

    //val cou = words.groupBy($"value" as "word" ).count().sort("").sort($"count" desc)

    //导入聚合函数

    import org.apache.spark.sql.functions._

    val counts = words.groupBy($"value" as"word").agg(count("*") as"counts").orderBy($"counts " desc)

    counts.show()

    sparke.stop()

    相关文章

      网友评论

          本文标题:SparkSQl的使用

          本文链接:https://www.haomeiwen.com/subject/axymkctx.html