美文网首页
spark-14-spark sql

spark-14-spark sql

作者: 西海岸虎皮猫大人 | 来源:发表于2020-10-05 18:33 被阅读0次

    1.概述

    shark基于spark,兼容hive,对hive依赖太强,废弃 -> spark sql
    spark sql脱离hive,支持原生rdd,操作dataframe
    spark on hive:
    hive只存储,spark sql计算

    Dataframe

    相当于sql查询结果集
    分布式数据容器,DataFrame底层即RDD
    含数据结构信息,支持嵌套类型

    数据来源

    内置: json jdbc mysql hive hdfs parquet(压缩比高于json)
    额外: avro(压缩) csv hbase es
    es与hbase整合(重要)

    谓词下推

    两表join先条件过滤列裁剪

    2 hello spark sql

    /**
     * 不能读取嵌套格式的json,先拼成非嵌套格式
     * 读取json格式rdd
     */
    public class GDataframeTest {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("sqltest");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
    
            DataFrame json = sqlContext.read().format("json").load("./json");
            // 转rdd
            JavaRDD<Row> javaRDD = json.javaRDD();
            javaRDD.foreach(new VoidFunction<Row>() {
                public void call(Row row) throws Exception {
    //                System.out.println(row);
    //                System.out.println(row.get(0));
    //                System.out.println(row.get(1));
                    System.out.println(row.getAs("name"));
                }
            });
            // 另一种加载json的方式
    //        DataFrame json = sqlContext.read().json("./json");
            // DataFrame中既有列的schema又有数据, 列按照ascii码排序
            // 写sql查询会按指定的顺序显示列
    //        json.show();
    //        json.printSchema();
    
            // select name, age from xxx where age > 18
            // 此方式不常用
    /*        DataFrame df = json.select("name", "age").where(json.col("age").gt(18));
            df.show();*/
            // 显示行数
    //        df.show(100);
            // 表相当于指针指向json源文件,底层操作spark job
            json.registerTempTable("t1");
            DataFrame sql = sqlContext.sql("select * from t1 where age > 18");
            sql.show();
            sc.stop();
        }
    }
    

    3 创建dataframe

    反射方式将普通RDD转为DataFrame
    public class IRDDToDF {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("RDD");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            JavaRDD<String> lineRDD = sc.textFile("person.txt");
            // Person需要实现序列化接口,访问级别需要是public
            // 涉及到节点传输需要序列化
            // 序列化版本号需要一致
            // driver端变量executor端拿不到的情况(无法序列化): transient修饰(拒绝反序列化) 静态变量
            final Person p = new Person();
            JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
                public Person call(String s) throws Exception {
                    // Person p = new Person();
                    p.setId(Integer.valueOf(s.split(",")[0]));
                    p.setName(s.split(",")[1]);
                    p.setAge(Integer.valueOf(s.split(",")[2]));
                    return p;
                }
            });
            // 普通RDD转DataFrame,反射方式
            DataFrame df = sqlContext.applySchema(personRDD, Person.class);
            df.show();
            df.printSchema();
            df.registerTempTable("person");
            DataFrame sql = sqlContext.sql("select id, name, age from person where id = 2");
            // 按照ascii码顺序排序
            sql.show();
    
            JavaRDD<Row> javaRDD = df.javaRDD();
            JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
                public Person call(Row row) throws Exception {
                    Person p = new Person();
                    // 字段多时繁琐
                    p.setId(Integer.valueOf(row.get(1).toString()));
                    // p.setId(Integer.valueOf(row.getAs("id").toString()));
                    p.setName(row.get(2).toString());
                    p.setAge(Integer.valueOf((row.get(0).toString())));
                    return p;
                }
            });
            map.foreach(new VoidFunction<Person>() {
                public void call(Person person) throws Exception {
                    System.out.println(person.toString());
                }
            });
        }
    }
    
    动态Schema创建DataFrame
    public class JDynamicSchema {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("RDD");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            JavaRDD<String> lineRDD = sc.textFile("person.txt");
            JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
                public Row call(String s) throws Exception {
                    return RowFactory.create(
                            s.split(",")[0],
                            s.split(",")[1],
                            Integer.valueOf(s.split(",")[2]));
                }
            });
            List<StructField> asList = Arrays.asList(
                    DataTypes.createStructField("id", DataTypes.StringType, true),
                    DataTypes.createStructField("name", DataTypes.StringType, true),
                    DataTypes.createStructField("age", DataTypes.IntegerType, true)
            );
            StructType schema = DataTypes.createStructType(asList);
            DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
            df.show();
        }
    }
    
    读取Parquet文件创建DF
    public class KParquest {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("parquet");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            JavaRDD<String> lineRDD = sc.textFile("json");
            DataFrame df = sqlContext.read().json(lineRDD);
            df.show();
            // 创建parquet文件
            // 覆盖
            df.write().mode(SaveMode.Overwrite).format("parquet").save("parquet");
    //        df.write().mode(SaveMode.Ignore).parquet("parquest");
            DataFrame load = sqlContext.read().format("parquet").load("parquet");
            load.show();
            sc.stop();
        }
    }
    
    读取mysql数据生成DF
    public class LDFFromMysql {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("mysql");
            // 默认200分区,数据量大时可以调大
            // 聚合join时会将操作分配到响应的分区
            conf.set("spark.sql.shuffle.partitions", "1");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            Map<String, String> options = new HashMap<String, String>();
            options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
            options.put("driver", "com.mysql.jdbc.Driver");
            options.put("user", "root");
            options.put("password", "root");
            options.put("dbtable", "person");
    
            DataFrame person = sqlContext.read().format("jdbc").options(options).load();
            person.show();
            // 临时表指针指向数据库
            person.registerTempTable("person");
    
            // 第二种方式
            DataFrameReader reader = sqlContext.read().format("jdbc");
            reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
            reader.option("driver", "com.mysql.jdbc.Driver");
            reader.option("user", "root");
            reader.option("password", "root");
            reader.option("dbtable", "score");
            DataFrame score = reader.load();
            score.show();
            score.registerTempTable("score");
            DataFrame result = sqlContext.sql("select person.age, score.score from person, score where person.id = score.id");
            result.show();
    
            // DF结果保存到mysql
            Properties properties = new Properties();
            properties.setProperty("user", "root");
            properties.setProperty("password", "root");
            result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);
            System.out.println("finish");
    
            sc.stop();
        }
    }
    

    4 spark on hive

    # hive配置文件copy到spark-shell节点(node-02)
    scp hive-site.xml node-02:/opt/spark/conf
    # hdfs相关配置
    # /opt/spark/conf/spark-env.sh
    -----------
    export HADOOP_HOME=/opt/hadoop
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    -----------
    
    # hive服务端节点(node-01)开启hive
    hive --service metastore
    # hive客户端节点(node-03)指定hive脚本
    hive
    # 执行hive脚本
    select count(*) from jizhan
    
    # node-02执行spark-shell
    ./spark-shell --master spark://node-01:7877,node-02:7877
    # 导包
    import org.apache.spark.sql.hive.HiveContext
    # 创建hive上下文
    val hiveContext = new HiveContext(sc)
    # spark执行hive sql
    hiveContext.sql("select count(*) from jizhan").show()
    # 与node-03 hive脚本对比可见spark执行效率明显优于hive
    
    提交spark on hive jar
    /**
     * 读取Hive中数据创建DF
     */
    public class MDFFromHive {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setAppName("hive");
            JavaSparkContext sc = new JavaSparkContext(conf);
            HiveContext hiveContext = new HiveContext(sc);
            hiveContext.sql("USE spark");
            hiveContext.sql("DROP TABLE IF EXISTS student_infos");
            hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT) row format delimited fields terminated by '\t'");
            hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");
            hiveContext.sql("drop table if exists student_scores");
            hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
            hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");
    
            DataFrame df = hiveContext.sql("select si.name, si.age, ss.score " +
                "from student_infos si " +
                "join student_scores ss " +
                "on si.name = ss.name " +
                "where ss.score >= 80");
            df.registerTempTable("good_student");
            DataFrame result = hiveContext.sql("select * from good_student");
            result.show();
    
            // 结果保存到hive
            hiveContext.sql("drop table if exists result");
            df.write().mode(SaveMode.Overwrite).saveAsTable("result");
            DataFrame table = hiveContext.table("result");
            Row[] rows = table.collect();
            for(Row row : rows) {
                System.out.println(row);
            }
            sc.stop();
        }
    }
    

    导出jar文件(注意无需导出依赖的jar包),上传至node-02 /opt/spark/lib目录

    # 执行spark jar
    cd /opt/spark/bin
    ./spark-submit --master node-01:7877,node-02:7877 ../lib/test.jar
    

    5 UDF | UDAF | 开窗函数

    UDF - 用户自定义函数
    public class NUDF {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("udf");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            // 根据list创建javaRDD
            JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu"));
            // 根据javaRDD创建rowRDD
            JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
                private static final long serialVersionUID = 1L;
                public Row call(String s) throws Exception {
                    return RowFactory.create(s);
                }
            });
    
            /**
             * 动态创建Schema方式加载DF
             */
            List<StructField> fields = new ArrayList<StructField>();
            fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
            StructType schema = DataTypes.createStructType(fields);
    
            DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
    
            df.registerTempTable("user");
    
            /**
             * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx
             */
            sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {
                private static final long serialVersionUID = 1L;
                public Integer call(String t1) throws Exception {
                    return t1.length();
                }
            }, DataTypes.IntegerType);
            sqlContext.sql("select name ,StrLen(name) as length from user").show();
    
    //      sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
    //          private static final long serialVersionUID = 1L;
    //          @Override
    //          public Integer call(String t1, Integer t2) throws Exception {
    //              return t1.length()+t2;
    //          }
    //      } ,DataTypes.IntegerType );
    //      sqlContext.sql("select name ,StrLen(name,10) as length from user").show();
    
    
            sc.stop();
        }
    }
    
    UDAF - 用户自定义聚合函数
    image.png
    public class OUDAF {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("udaf");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            // 有可能分布在多个分区中
            JavaRDD<String> parallelize = sc.parallelize(
                    Arrays.asList("zhangsan","lisi","wangwu","zhangsan","zhangsan","lisi"));
            JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
                private static final long serialVersionUID = 1L;
                public Row call(String s) throws Exception {
                    return RowFactory.create(s);
                }
            });
    
            List<StructField> fields = new ArrayList<StructField>();
            fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
            StructType schema = DataTypes.createStructType(fields);
            // 动态schema创建dataframe
            DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
            df.registerTempTable("user");
            /**
             * 注册一个UDAF函数,实现统计相同值得个数
             * 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
             */
            sqlContext.udf().register("StringCount",new UserDefinedAggregateFunction() {
                private static final long serialVersionUID = 1L;
                /**
                 * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
                 * 给每个分区的每个key做初始值(包括reduce端的每个key)
                 */
                @Override
                public void initialize(MutableAggregationBuffer buffer) {
                    buffer.update(0, 0);
                }
                /**
                 * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
                 * buffer.getInt(0)获取的是上一次聚合后的值
                 * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合
                 * 大聚和发生在reduce端.
                 * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
                 */
                @Override
                public void update(MutableAggregationBuffer buffer, Row arg1) {
                    // row即每一条数据
                    // 计数,落地
                    buffer.update(0, buffer.getInt(0)+1);
    
                }
                /**
                 * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
                 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
                 * buffer1.getInt(0) : 大聚合的时候 上一次聚合后的值
                 * buffer2.getInt(0) : 这次计算传入进来的update的结果
                 * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
                 */
                @Override
                public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
                    // row即map端每个分区处理后的结果
                    // reduce端拉取不同节点数据聚合
                    buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
                }
                /**
                 * 在进行聚合操作的时候所要处理的数据的结果的类型
                 */
                @Override
                public StructType bufferSchema() {
                    return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
                }
                /**
                 * 最后返回一个和dataType方法的类型要一致的类型,返回UDAF最后的计算结果
                 */
                @Override
                public Object evaluate(Row row) {
                    return row.getInt(0);
                }
                /**
                 * 指定UDAF函数计算后返回的结果类型
                 */
                @Override
                public DataType dataType() {
                    return DataTypes.IntegerType;
                }
                /**
                 * 指定输入字段的字段及类型
                 */
                @Override
                public StructType inputSchema() {
                    return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("namexxx", DataTypes.StringType, true)));
                }
                /**
                 * 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。
                 */
                @Override
                public boolean deterministic() {
                    return true;
                }
    
            });
    
            sqlContext.sql("select name ,StringCount(name) as strCount from user group by name").show();
    
    
            sc.stop();
        }
    }
    
    开窗函数
    # 提交spark jar任务
    ./spark-submit --master spark://node-01:7877,node-02:7877 --class cn.dfun.demo.spark.PWindowFun ../lib/test.jar
    
    /**
     * 开窗函数
     * 按照某字段分组,按照另一字段排序
     * 开窗函数需要使用hiveContext执行
     */
    public class PWindowFun {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setAppName("windowfun");
            conf.set("spark.sql.shuffle.partitions","1");
            JavaSparkContext sc = new JavaSparkContext(conf);
            HiveContext hiveContext = new HiveContext(sc);
            hiveContext.sql("use spark");
            hiveContext.sql("drop table if exists sales");
            hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
                    + "row format delimited fields terminated by '\t'");
            hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
    
            DataFrame result = hiveContext.sql("select riqi,leibie,jine "
                    + "from ("
                    + "select riqi,leibie,jine,"
                    + "row_number() over (partition by leibie order by jine desc) rank " // 按照类别分组,按照金额降序排列
                    + "from sales) t "
                    + "where t.rank<=3");
            result.show(100);
            /**
             * 将结果保存到hive表sales_result
             */
            result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
            sc.stop();
        }
    }
    

    相关文章

      网友评论

          本文标题:spark-14-spark sql

          本文链接:https://www.haomeiwen.com/subject/idcvuktx.html