美文网首页
spark从入门到放弃二十八:Spark Sql (1)Data

spark从入门到放弃二十八:Spark Sql (1)Data

作者: 意浅离殇 | 来源:发表于2018-04-01 22:22 被阅读0次

    文章地址:http://www.haha174.top/article/details/257834
    项目源码:https://github.com/haha174/spark.git
    1.简介


    Spark Sql 是Spark 中的一个模块,主要是用于进行结构化数据处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark Sql 还可以作为分布式sql 查询引擎。Saprk Sql 最重要的功能之一,就是从hive 中查询数据。

    Data Frame 可以理解为 以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了许多的优化。DataFrame可以通过很对源进行构建,包括结构化数据文件,hive中的表,外部的关系型数据库以及RDD.

    Dataset是从Spark 1.6开始引入的一个新的抽象,当时还是处于alpha版本;然而在Spark 2.0,它已经变成了稳定版了.DataFrame是特殊的Dataset,它在编译时不会对模式进行检测

    2.SQLContext

    首先要使用spark sql 首先就得创建一个SQLContext 对象,或者是他的子类对象或者是HiveContext 的对象。
    2.1 创建DataSet(这里没有使用SparkSession 的方式)
    使用SQLContext,可以从RDD,hive 表或者其他的数据源,来创建一个DataSet,以下是JSON文件创建DataSet的例子:
    SQLContext sqlContext=new SQLContext(sc);
    DataSet df=sqlContext.read().json("hdfs://spark1:9000/students.json");
    df.show()
    下面给出一个java 示例:

    public class DataFrameStudy {
    
        public static void main(String[] args) {
            SparkConf conf=new SparkConf().setAppName("DataFrameStudy");
            JavaSparkContext sc=new JavaSparkContext(conf);
            SQLContext sqlContext=new SQLContext(sc);
            DataFrameReader reader=sqlContext.read();
            Dataset ds= reader.json("hdfs://hadoop:8020/data/students.json");
            ds.show();
        }
    
    }
    

    students.json 长这个样子

    {"id":1,"name":"java","age":18}
    {"id":2,"name":"spark","age":19}
    {"id":3,"name":"scala","age":20}
    

    将打成的jar提交到spark 集群上面去运行下面给出submit 脚本

    /apps/soft/spark-2.2.1-bin-hadoop2.7/bin/spark-submit \
     --class com.wen.saprk.sql.core.DataFrameStudy  \
    --num-executors 3  \
    --driver-memory 1g   \
     --executor-memory 1g  \  
    --executor-cores 1  \
     --master spark://192.168.1.22:7077 /data/spark-submit/spl/dataframe/spark-study-sql-java-1.0-SNAPSHOT-jar-with-dependencies.jar
    

    赋予执行权限

    chmod  777 submit-sql.sh
    

    执行即可看到


    这里写图片描述

    下面给出scala 示例

    object DataFrameStudy {
      def main(args: Array[String]): Unit = {
          val conf=new SparkConf().setAppName("DataFrameStudy")
          val  sc = new SparkContext(conf)
          val sqlContext = new SQLContext(sc)
         val reader = sqlContext.read
          val ds = reader.json("hdfs://hadoop:8020/data/students.json")
          ds.show()
      }
    }
    
    

    结果和脚本和上面一样就不再演示了
    3.HiveContext


    除了基本的SQLContext 以外,还可以使用它的子类--HiveContext.HiveContext的功能除了包含SqlContext 提供的所有功能之外,还包含了额外的专门针对于Hive的一些功能。这些额外的功能包括:使用HiveQL 语法来编写和执行SQL,使用Hive中的UDF函数,从Hive表中读取数据。
    要使用HiveContext,就必须预先安装好Hive,SQLContext支持的数据源,HiveContext也同样支持。对于Spark 1.3.x 以上的版本都推荐使用HiveContext,因为他的功能更加的丰富和完善。
    Spark Sql 还支持用spark.sql.dialect 参数设置Sql的方言,使用SqlContext的setConf()既可以进行设置。对于SqlContext ,它支持sql 一种方言。对于HiveContext 默认的方言是hiveql.

    4.Data Set 常用操作

    下面给出java 示例:

    public class DataFrameOperation {
        public static void main(String[] args) {
            SparkConf conf=new SparkConf().setAppName("DataFrameOperation");
            JavaSparkContext sc=new JavaSparkContext(conf);
            SQLContext sqlContext=new SQLContext(sc);
            DataFrameReader reader=sqlContext.read();
            //创建出dataset  可以理解为一张表
            Dataset ds= reader.json("hdfs://hadoop:8020/data/students.json");
            //打印dataFrame中的所有数据
            ds.show();
            ////打印dataFrame的元数据
            ds.printSchema();
            //查询某列所有的数据
            ds.select("name").show();
            //查询某几列所有的数据并对列进行计算
            ds.select(ds.col("name"),ds.col("age").plus(1)).show();
            //根据某一列的值进行过滤
            ds.filter(ds.col("age").gt("18")).show();
            //根据某一列进行分组聚合
            ds.groupBy(ds.col("age")).count().show();
            sc.close();
        }
    }
    

    提交脚本运行即可

    /apps/soft/spark-2.2.1-bin-hadoop2.7/bin/spark-submit  --class com.wen.spark.sql.core.DataFrameOperation  --num-executors 1  --driver-memory 1g   --executor-memory 1g  --executor-cores 1   --master spark://192.168.1.22:7077 /data/spark-submit/spl/dataframe/java/spark-study-sql-java-1.0-SNAPSHOT-jar-with-dependencies.jar
    

    下面给出scala 示例

    
      object DataFrameOperation {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf().setAppName("DataFrameOperation")
          val sc = new JavaSparkContext(conf)
          val sqlContext = new SQLContext(sc)
          val reader = sqlContext.read
          //创建出dataset  可以理解为一张表
          val ds = reader.json("hdfs://hadoop:8020/data/students.json")
          //打印dataFrame中的所有数据
          ds.show()
          ////打印dataFrame的元数据
          ds.printSchema()
          //查询某列所有的数据
          ds.select("name").show()
          //查询某几列所有的数据并对列进行计算
          ds.select(ds.col("name"), ds.col("age").plus(1)).show()
          //根据某一列的值进行过滤
          ds.filter(ds.col("age").gt("18")).show()
          //根据某一列进行分组聚合
          ds.groupBy(ds.col("age")).count.show()
          sc.close()
        }
    }
    

    欢迎关注,更多福利

    这里写图片描述

    相关文章

      网友评论

          本文标题:spark从入门到放弃二十八:Spark Sql (1)Data

          本文链接:https://www.haomeiwen.com/subject/mutzcftx.html