美文网首页Spark
SparkSQL编程实战

SparkSQL编程实战

作者: 毛豆val | 来源:发表于2018-01-16 16:37 被阅读0次

Spark SQL

DataFrame 的创建以及基本操作

DataFrame可以理解成关系型数据库中的表,它与 RDD 的差别在于 DataFrame 有 schema 信息

public class DataFrameCreate {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("DataFrame");
        JavaSparkContext sc = new JavaSparkContext(conf);

        SQLContext sqlContext = new SQLContext(sc);

        DataFrame dataFrame = sqlContext.read()
                .json(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath());

        // 打印 DataFrame
        dataFrame.show();
        // 打印 DataFrame 的 Schema 信息
        dataFrame.printSchema();
        // 查询某一列的数据
        dataFrame.select(dataFrame.col("name")).show();
        // 查询某几列的数据,并对列进行计算
        dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show();
        // 根据某一列的值进行过滤
        dataFrame.filter(dataFrame.col("age").gt(20)).show();
        // 根据某一列进行分组,然后再进行聚合
        dataFrame.groupBy(dataFrame.col("name")).count().show();
    }
}
object DataFrameCreate extends App {
    val conf = new SparkConf().setMaster("local").setAppName("DataFrame")
    val sc = SparkContext.getOrCreate(conf)

    // 创建 SQLContext
    val sqlContext = SQLContext.getOrCreate(sc)
    // 创建 DataFrame 对象
    val dataFrame = sqlContext.read.json(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
    dataFrame.show()
    dataFrame.printSchema()
    dataFrame.select(dataFrame.col("name")).show()
    dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show()
    dataFrame.filter(dataFrame.col("age").gt(20)).show()
    dataFrame.groupBy(dataFrame.col("name")).count().show()
}

RDD转换为DataFrame

使用反射的方式将RDD转换为DataFrame

/**
 * Java语言实现
 * 通过反射的方式将RDD转换为DataFrame
 */
public class RDD2DataFrameReflection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 读取文件生成RDD,然后将RDD中的line转换为Person类型
        JavaRDD<Person> initRDD = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
                .map(line -> line.split(" "))
                .map(array -> new Person(array[0], Integer.parseInt(array[1]), array[2]));

        // 通过反射创建DataFrame,然后注册成一张临时表(person)
        sqlContext.createDataFrame(initRDD, Person.class).registerTempTable("person");

        // 执行sql查询,查询出年龄大于18岁的数据
        DataFrame personDataFrame = sqlContext.sql("select name,age,sex from person where age > 18");

        // 打印数据
        personDataFrame.show();

        // 将DataFrame再次转换成RDD,此时RDD中的数据类型为RDD,需要将其转换成Person类型。最后打印结果。
        personDataFrame.javaRDD()
                .map(row -> new Person(row.getString(0), row.getInt(1), row.getString(2)))
                .collect()
                .forEach(System.out::println);
    }


    /**
     * 使用反射将RDD转换为DataFrame时,JavaBean的class必须使用public修饰,并且需要实现Serializable接口
     */
    public static class Person implements Serializable {
        private String name;
        private Integer age;
        private String sex;

        public Person() {
        }

        public Person(String name, Integer age, String sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public String getSex() {
            return sex;
        }

        public void setSex(String sex) {
            this.sex = sex;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    ", sex='" + sex + '\'' +
                    '}';
        }
    }
}
// 使用scala语言实现RDD和DataFrame之间的转换
object RDD2DataFrameReflection {
    // 申明一个case class 作为bean对象
    case class Person(name: String, age: Int, sex: String)

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
        val sc = SparkContext.getOrCreate(conf)
        val sqlContext = SQLContext.getOrCreate(sc)

        // scala中RDD转成DataFrame需要导入隐式转换的包
        import sqlContext.implicits._
        // 读取txt文件生成RDD,调用toDF转换成DataFrame,最后将DataFrame注册成一张临时表
        sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
            .map(_.split(" "))
            .map(array => Person(array(0), array(1).trim.toInt, array(2)))
            .toDF()
            .registerTempTable("person")

        // 直接调用DataFrame的rdd方法即可得到RDD,此时RDD中的数据类型为Row,需要转换成Person。最后进行打印
        sqlContext.sql("select name,age,sex from person where age > 18")
            .rdd
            .map(row => Person(row.getString(0), row.getInt(1), row.getString(2)))
            .collect
            .foreach(println)
    }
}

使用编码的方式动态创建元数据

// 使用Java语言实现RDD和DataFrame的互相转换(自定义元数据的方式)
public class RDD2DataFrameProgrammatically {
    public static void main(String[] args) {
        // 创建SparkContext和SQLContext
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 从文件中读取数据生成RDD,将RDD中的类型转换成Row类型
        JavaRDD<Row> rdd = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
                .map(line -> line.split(" "))
                .map(array -> RowFactory.create(array[0], Integer.parseInt(array[1]), array[2]));

        // 构建元数据信息
        List<StructField> structFields = Arrays.asList(
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.IntegerType, true),
                DataTypes.createStructField("sex", DataTypes.StringType, true)
        );
        StructType structType = DataTypes.createStructType(structFields);

        // 将RDD转换成DataFrame,并将其注册成一张临时表
        sqlContext.createDataFrame(rdd, structType).registerTempTable("person");

        sqlContext.sql("select name,age,sex from person where age > 18")
                .javaRDD()
                .map(row -> new RDD2DataFrameReflection.Person(row.getString(0), row.getInt(1), row.getString(2)))
                .collect()
                .forEach(System.out::println);
    }
}

// 使用scala语言实现RDD和DataFrame的互相转换(自定义元数据的方式)
object RDD2DataFrameProgrammatically {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
        val sc = SparkContext.getOrCreate(conf)
        val sqlContext = SQLContext.getOrCreate(sc)

        // 从文本文件中读取数据生成RDD
        val rdd = sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
            .map(_.split(" "))
            .map(array => Row(array(0), array(1).toInt, array(2)))
        // 构建元数据信息
        val structType = StructType(Array(
            StructField("name", StringType, true),
            StructField("age", IntegerType, true),
            StructField("sex", StringType, true)
        ))
        // 将RDD转换成DataFrame并将其注册成一张临时表
        sqlContext.createDataFrame(rdd, structType).registerTempTable("person")
        // 查询出年龄大于18的用户信息,将其转成RDD再进行打印
        sqlContext.sql("select name,age,sex from person where age > 18")
            .rdd
            .map(row => Person(row.getString(0), row.getAs[Integer](1), row.getString(2)))
            .collect()
            .foreach(println)
    }
}

load && save 方法

// Java 语言演示 load 和 save 方法的具体使用
public static void loadAndSave() {
    SparkConf conf = new SparkConf().setAppName("LoadAndSave").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);


    // 不指定类型时 load 和 save 默认的格式为 parquet
    sqlContext.read()
            .load(Thread.currentThread().getContextClassLoader().getResource("student.parquet").getPath())
            .write()
            .save(Thread.currentThread().getContextClassLoader().getResource("student_copy.parquet").getPath());

    // 手动指定 load 和 save 方法操作的文件类型
    sqlContext.read()
            .format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .show();

    sqlContext.read()
            .format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .write()
            .format("json")
            .save("file:///Users/garen/Documents/student_copy");

    sqlContext.read()
            .format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .write()
            .format("json")
            .mode(SaveMode.ErrorIfExists)
            .save("file:///Users/garen/Documents/student_copy");
}
def loadAndSave(): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("LoadAndSave")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)

    sqlContext.read
        .format("json")
        .load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
        .show()

    val dataFrame = sqlContext.read
        .format("json")
        .load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
    dataFrame.filter(dataFrame.col("age").gt(18))
        .write
        .format("json")
        .mode(SaveMode.ErrorIfExists)
        .save("file:///D:/demo")
}

parquet数据源操作

使用编程方式加载parquet数据源

// java读取parquet文件
public static void loadParquet() {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("parquet");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);

    sqlContext.read().parquet(Thread.currentThread().getContextClassLoader().getResource("/tarball/users.parquet").getPath()).registerTempTable("person");
    sqlContext.sql("select name from person")
            .javaRDD()
            .map(row -> row.getString(0))
            .collect()
            .forEach(System.out::println);
}
// scala读取parquet文件
def parquet(): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("parquet")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)

    sqlContext.read
        .parquet(Thread.currentThread.getContextClassLoader.getResource("tarball/users.parquet").getPath)
        .registerTempTable("users")

    sqlContext.sql("select * from users")
        .rdd
        .map(_.getString(0))
        .collect
        .foreach(println)
}

自动分区推断

// Java语言
public static void parquetPartion() {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetPartion");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);

    // 打印schema信息,会多出两个分区字段("gender","country")
    // 分区列数据类型:支持numeric和string类型的自动推断,通过“spark.sql.sources.partitionColumnTypeInference.enabled”配置开启或关闭(默认开启),关闭后分区列全为string类型。
    sqlContext.read()
            .parquet("hdfs://spark1:9000/spark-study/users/gender=male/country=US/users.parquet")
            .printSchema();
}

自动合并元数据

def mergeSchema(): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MergeSchema")
    // 方法一:通过 conf 对象,设置 spark.sql.parquet.mergeSchema 为 true
    // conf.set("spark.sql.parquet.mergeSchema", "true")
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    val sqlContext: SQLContext = SQLContext.getOrCreate(sc)

    val nameAndAgeArray: Array[(String, Int)] = Array(("Tom", 23), ("Garen", 21), ("Peter", 25))
    val nameAndScoreArray: Array[(String, Int)] = Array(("Tome", 100), ("Garen", 98), ("Peter", 95))

    import sqlContext.implicits._
    sc.parallelize(nameAndAgeArray)
        .toDF("name", "age")
        .write
        .format("parquet")
        .mode(SaveMode.Append)
        .save("hdfs://users/hadoop/persons")

    sc.parallelize(nameAndScoreArray)
        .toDF("name", "score")
        .write
        .format("parquet")
        .mode(SaveMode.Append)
        .save("hdfs://users/hadoop/persons")

    // 方法二:通过 option 设置 MergeSchema 为 true
    sqlContext.read.option("mergeSchema", "true")
        .parquet("hdfs://users/hadoop/persons")
        .printSchema()
        
    sqlContext.read.option("mergeSchema", "true")
            .parquet("data/persons")
            .show()
}

/**
 * root
 * |-- name: string (nullable = true)
 * |-- score: integer (nullable = true)
 * |-- age: integer (nullable = true)
 *
 *  +-----+-----+----+
 *  | name|score| age|
 *  +-----+-----+----+
 *  |  Tom| null|  23|
 *  |Garen| null|  21|
 *  |Peter| null|  25|
 *  | Tome|  100|null|
 *  |Garen|   98|null|
 *  |Peter|   95|null|
 *  +-----+-----+----+
 */

Json 数据源操作

需求:查询分数大于80的学生信息以及成绩信息

public static void jsonOption() {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("JsonOption");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);

    // 方式一: 从 json 文件创建学生信息的 DataFrame,并注册成临时表
    sqlContext.read().format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .registerTempTable("student_info");

    JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
            "{\"id\":\"1001\",\"score\":100}",
            "{\"id\":\"1002\",\"score\":79}",
            "{\"id\":\"1003\",\"score\":98}"
    ));
    // 方式二: 从 json 格式的 RDD 创建学生分数信息的 DataFrame,并注册成临时表
    sqlContext.read().json(rdd).registerTempTable("student_score");
    // 查询分数大于80的学生信息和成绩,并保存到 json 文件中
    sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
            .write()
            .format("json")
            .mode(SaveMode.Overwrite)
            .save("data/student_score");
}
def jsonOption(): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("JsonOption")
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)
    // 从文件中读取json数据,并注册成临时表
    sqlContext.read
        .format("json")
        .load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
        .registerTempTable("student_info")

    // 从rdd中读取数据,并注册成临时表
    val jsonArray: Array[String] = Array(
        "{\"id\":\"1001\",\"score\":100}",
        "{\"id\":\"1002\",\"score\":90}",
        "{\"id\":\"1003\",\"score\":80}"
    )
    val rdd = sc.parallelize(jsonArray)
    sqlContext.read.json(rdd).registerTempTable("student_score")

    sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
        .write
        .format("json")
        .mode(SaveMode.Overwrite)
        .save("data/students_score")
}

Hive数据源操作

操作hive数据源完成查询成绩大于80的用户信息的功能。对hive数据源进行操作时需要将hive-site.xml和mysql的驱动包拷贝到spark的目录下。

public static void hiveOption() {
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("HiveOption");
    JavaSparkContext sc = new JavaSparkContext(conf);
    HiveContext hiveContext = new HiveContext(sc);

    // 如果student_info表存在则先删除
    hiveContext.sql("DROP TABLE IF EXISTS hive.student_info").show();
    // 创建studnet_info表结构
    hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_info (name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
    // 加载数据到student_info表中
    hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_info.txt' INTO TABLE student_info").show();

    // 如果student_score表存在则先删除
    hiveContext.sql("DROP TABLE IF EXISTS hive.student_score").show();
    // 创建student_score表结构
    hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_score (name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
    // 加载数据到student_score表中
    hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_score.txt' INTO TABLE student_score").show();

    // 查询出成绩大于80的学生信息并保存为hive的一张新表
    hiveContext.sql("select a.name,a.age,b.score from student_info a left join student_score b on a.name = b.name where b.score > 80")
            .saveAsTable("student_info_score");

    // 通过hive中的表直接创建DataFrame
    Row[] rows = hiveContext.table("student_info_score").collect();
    Arrays.asList(rows).forEach(System.out::println);
}

JDBC 数据源操作

public static void jdbc() {
    SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("JDBCOption");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = SQLContext.getOrCreate(sc.sc());

    // 从student_info表中读取数据,并转换成 RDD
    Map<String, String> options = new HashMap<>();
    options.put("url", "jdbc:mysql://hadoop-cdh:3306/testdb");
    options.put("user", "root");
    options.put("password", "Slb930802.");
    options.put("dbtable", "student_info");
    DataFrame student_info_df = sqlContext.read().format("jdbc").options(options).load();
    JavaPairRDD<String, Integer> student_info_rdd = student_info_df.javaRDD()
            .mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));

    // 从student_score表中读取数据,并转换成 RDD
    options.put("dbtable", "student_score");
    DataFrame student_score_df = sqlContext.read().format("jdbc").options(options).load();
    JavaPairRDD<String, Integer> student_score_rdd = student_score_df.javaRDD()
            .mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));

    // 使用 join 算子连接两个 RDD,再筛选出分数大于80的学生信息
    JavaRDD<Row> good_student_info_rdd = student_info_rdd.join(student_score_rdd)
            .map(t -> RowFactory.create(t._1(), t._2()._1(), t._2()._2()))
            .filter(row -> row.getInt(2) > 80);

    // 通过自定义元数据的方式将 RDD 转换成 DataFrame 并打印
    List<StructField> structFields = Arrays.asList(
            DataTypes.createStructField("name", DataTypes.StringType, true),
            DataTypes.createStructField("age", DataTypes.IntegerType, true),
            DataTypes.createStructField("score", DataTypes.IntegerType, true)
    );
    StructType structType = DataTypes.createStructType(structFields);
    sqlContext.createDataFrame(good_student_info_rdd, structType).show();

    // 将 RDD 的数据保存到 mysql 数据库中
    good_student_info_rdd.foreach(row -> {
        Class.forName("com.mysql.jdbc.Driver");
        Connection conn = null;
        PreparedStatement ps = null;

        try {
            conn = DriverManager.getConnection("jdbc:mysql://hadoop-cdh:3306/testdb", "root", "Slb930802.");
            ps = conn.prepareStatement("INSERT INTO good_student_info VALUES(?,?,?)");
            ps.setString(1, row.getString(0));
            ps.setInt(2, row.getInt(1));
            ps.setInt(3, row.getInt(2));
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (ps != null) ps.close();
            if (conn != null) conn.close();
        }
    });
}

SparkSQL内置函数

统计UV

def uv(): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("uv")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)

    // 构造日志信息,并转换成DataFrame的格式
    val logArray = Array(
        "20180115,fpanpxc2gvjd2py0jd2254ng",
        "20180115,fpanpxc2gvjd2py0jd2254ng",
        "20180115,4xlt0txdgy220f3owkhxv4y1",
        "20180116,kz5t0lvcjsbsqacjqwqyhfu4")
    val logRDD = sc.parallelize(logArray)
        .map(_.split(","))
        .map(array => RowFactory.create(array(0), array(1)))
    val structType = StructType(Array(
        StructField("data", StringType, false),
        StructField("sessionId", StringType, false)
    ))
    val logDF = sqlContext.createDataFrame(logRDD, structType)

    // 使用SparkSQL的内置函数对字段做聚合操作
    // 使用SparkSQL的内置函数需要导入以下两个包
    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    logDF.groupBy("data").agg(countDistinct('sessionId).as("uv")).show()
}

统计每日的总销售额

def totalSale(): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("TotalSale")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)
    // 构建DataFrame
    val logArray = Array("20180115,34.3",
        "20180115,35.6",
        "20180116,99.9")
    val logRDD = sc.parallelize(logArray)
        .map(_.split(","))
        .map(array => Row(array(0), array(1).toDouble))
    val structType = StructType(Array(
        StructField("data", StringType, false),
        StructField("sale", DoubleType, false)
    ))
    
    // 做分组聚合,统计每日交易额
    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    sqlContext.createDataFrame(logRDD, structType)
        .groupBy("data")
        .agg('data, sum('sale).as("total_sale"))
        .show()
}

SparkSQL窗口函数

row_number()窗口函数

select url,rate,row_number() over(partition by url order by rate desc) as r from window_test2; 

相关文章

  • SparkSQL编程实战

    Spark SQL DataFrame 的创建以及基本操作 DataFrame可以理解成关系型数据库中的表,它与 ...

  • SparkSQL实战

    数据说明 数据集是货品交易数据集 每个订单可能包含多个货品,每个订单可以产生多次交易,不同的货品有不同的单价 加载...

  • 【Spark实战】SparkSQL实战

    1 实战环境 使用Spark-Shell进行练习,Spark-Shell就是一种特殊的SparkSubmit,所以...

  • SparkSQL项目实战

    需求 SQL语句1、join得到城市和产品名称2、group by得到一个地区的总的点击次数3、取前三可以先用开窗...

  • SparkSql编程指南

    花了几天休息的时间整理了这篇文章,就为了让你读完就能深入了解并熟练运用Spark SQL!如果你觉得有用的话请收藏...

  • SparkSQL编程之DataFrame

    SparkSession新的起始点 在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLConte...

  • SparkSql之编程方式

    什么是SparkSql? SparkSql作用主要用于用于处理结构化数据,底层就是将SQL语句转成RDD执行 Sp...

  • Hive to SparkSQL 简单指南

    Hive 转 SparkSQL 1. SparkSQL vs Hive 性能差距:SparkSQL 比 Hive ...

  • Spark SQL实战:SparkSQL exmple

    1.需求:使用Spark SQL,读取文件并查询数据表2.代码:(1)pom.xml (2)SparkSQLExa...

  • 通过自定义SparkSQL外部数据源实现SparkSQL读取HB

    通过自定义SparkSQL外部数据源实现SparkSQL读取HBase 标签: SparkSQL HBase Sa...

网友评论

    本文标题:SparkSQL编程实战

    本文链接:https://www.haomeiwen.com/subject/phnsoxtx.html