spark Sql

作者: 小月半会飞 | 来源:发表于2019-03-12 10:38 被阅读0次

    一、什么是spark Sql

    Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎,是一种解析传统SQL到大数据运算模型的引擎。从下图可以查看RDD、DataFrames与DataSet的关系。


    image.png

    二、hive 与spark sql

    Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!所以我们类比的理解:Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD。

    三、测试数据

    我们使用2个csv文件作为部分测试数据:

    dept.csv信息:

    10,ACCOUNTING,NEW YORK
    20,RESEARCH,DALLAS
    30,SALES,CHICAGO
    40,OPERATIONS,BOSTON
    
    

    emp.csv信息:

    7369,SMITH,CLERK,7902,1980/12/17,800,,20
    7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
    7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
    7566,JONES,MANAGER,7839,1981/4/2,2975,,20
    7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
    7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
    7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
    7839,KING,PRESIDENT,,1981/11/17,5000,,10
    7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
    7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
    7900,JAMES,CLERK,7698,1981/12/3,950,,30
    7902,FORD,ANALYST,7566,1981/12/3,3000,,20
    7934,MILLER,CLERK,7782,1982/1/23,1300,,10
    
    

    将这2个csv文件put到HDFS的hdfs://hadoop3:80070/input/目录以便后面使用
    先打开dfs

    start-dfs.sh
    
    [root@bigdata111 ~]# hdfs dfs -ls /input
    -rw-r--r--   3 root supergroup         80 2019-03-01 05:05 /input/dept.csv
    -rw-r--r--   3 root supergroup        603 2019-03-01 05:05 /input/emp.csv
    

    四、创建DataFrame

    1、启动spark集群

    spark-start-all.sh
    

    2、启动spark_shell

    spark-shell
    

    3、创建方式一:使用case class定义表

    scala> val rdd=sc.textFile("hdfs://hadoop3/input/dept.csv")
    scala> val rdd2=rdd.filter(_.lenght>0).map(_.split(","))
    scala> case class Dept(deptno:Int,dname:String,loc:String)
    scala> val deptrdd=rdd2.map(x => Dept(x(0).toInt.x(1),x(2)))
    scala> val deptdf=deptrdd.toDF
    

    结果输出:
    对应sql语句:select * from dept

    scala> deptdf.show
    +------+----------+--------+
    |deptno|     dname|     loc|
    +------+----------+--------+
    |    10|ACCOUNTING|NEW YORK|
    |    20|  RESEARCH|  DALLAS|
    |    30|     SALES| CHICAGO|
    |    40|OPERATIONS|  BOSTON|
    +------+----------+--------+
    
    

    对应sql语句:desc dept

    scala> deptrd.printSchema
    root
     |-- deptno: integer (nullable = false)
     |-- dname: string (nullable = true)
     |-- loc: string (nullable = true)
    
    

    、创建方式二:使用SparkSession对象创建DataFrame

    scala>val lines = sc.textFile("/root/temp/csv/emp.csv").map(_.split(","))//读取Linux数据
    scala>val lines = sc.textFile("hdfs://10.30.30.146:9000/input/emp.csv").map(_.split(","))//读取HDFS数据
    scala>import org.apache.spark.sql._
    scala>import org.apache.spark.sql.types._
    scala>val myschema = StructType(List(StructField("empno", DataTypes.IntegerType)
            , StructField("ename", DataTypes.StringType)
            ,StructField("job", DataTypes.StringType)
            ,StructField("mgr", DataTypes.StringType)
            ,StructField("hiredate", DataTypes.StringType)
            ,StructField("sal", DataTypes.IntegerType)
            ,StructField("comm", DataTypes.StringType)
            ,StructField("deptno", DataTypes.IntegerType)))//定义schema:StructType
    
    scala>val rowRDD = lines.map(x=>Row(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))// 把读入的每一行数据映射成一个个Row
    scala>val df = spark.createDataFrame(rowRDD,myschema)//使用SparkSession.createDataFrame创建表
    

    结果输出:

    scala> df.show
    +-----+------+---------+----+----------+----+----+------+
    |empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
    +-----+------+---------+----+----------+----+----+------+
    | 7369| SMITH|    CLERK|7902|1980/12/17| 800|    |    20|
    | 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
    | 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
    | 7566| JONES|  MANAGER|7839|  1981/4/2|2975|    |    20|
    | 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
    | 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|    |    30|
    | 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|    |    10|
    | 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|    |    20|
    | 7839|  KING|PRESIDENT|    |1981/11/17|5000|    |    10|
    | 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
    | 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|    |    20|
    | 7900| JAMES|    CLERK|7698| 1981/12/3| 950|    |    30|
    | 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|    |    20|
    | 7934|MILLER|    CLERK|7782| 1982/1/23|1300|    |    10|
    +-----+------+---------+----+----------+----+----+------+
    
    

    、创建方式三:直接读取格式化的文件(json,csv)等-最简单

    准备:
    因为/opt/modules/app/spark/examples/src/main/resources目录下有准备好的样例,所以直接将某一个json样例上传

    [root@hadoop3 resources]# pwd
    /opt/modules/app/spark/examples/src/main/resources
    [root@hadoop3 resources]# hadoop fs -put ./people.json /input/
    
    scala> val peopleDF = spark.read.json("hdfs://hadoop3/input/people.json")
    19/03/01 06:16:51 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    

    结果输出:

    scala> peopleDF.show
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    

    五、操作DataFrame

    1、添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.neusoft</groupId>
      <artifactId>sparkdemo</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>sparkdemo</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.3.1</spark.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.11</artifactId>
          <version>1.6.2</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.8</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.8</version>
        </dependency>
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.47</version>
        </dependency>
    
      </dependencies>
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
    
    

    2、查询表

    select * from 表

    package com.neusoft
    
    import org.apache.spark.sql.SparkSession
    
    object Sparksqldemo2 {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .master("local")
          .appName("test")
          .config("spark.sql.shuffle.partitions", "5")
          .getOrCreate()
    
        val df = spark.read.json("hdfs://192.168.159.133:8020/input/people.json")
        df.show()
      }
    }
    
    

    3、DSL操作DataFrame

    1.查看所有的员工信息===selec * from empDF;

    df.show
    
    image

    2.查询所有的员工姓名 ($符号添加不加功能一样)===select ename,deptno from empDF;

    df.select("ename","deptno").show
    df.select("ename","deptno").show
    
    image

    3.查询所有的员工姓名和薪水,并给薪水加100块钱===select ename,sal,sal+100 from empDF;

    empDF.select("ename","sal",$"sal"+100).show
    
    image

    4.查询工资大于2000的员工===select * from empDF where sal>2000;

    df.filter($"sal" > 2000).show
    
    image

    5.分组===select deptno,count(*) from empDF group by deptno;
    scala>empDF.groupBy(""deptno").count,showscala>empDF.groupby("deptno").avg().show
    scala>empDF.groupBy($"deptno").max().show

    image

    4、SQL操作DataFrame

    (1)前提条件:需要把DataFrame注册成是一个Table或者View

    df.createOrReplaceTempView("emp")
    

    (2)使用SparkSession执行从查询

    spark.sql("select * from emp").show
    spark.sql("select * from emp where deptno=10").show
    
    image

    (3)求每个部门的工资总额

    spark.sql("select deptno,sum(sal) from emp group by deptno").show
    
    image

    六 、视图

    在使用SQL操作DataFrame的时候,有一个前提就是必须通过DF创建一个表或者视图:

    df.createOrReplaceTempView("emp")
    

    在SparkSQL中,如果你想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。

    ① 创建一个普通的view和一个全局的view

    df.createOrReplaceTempView("emp1")
    df.createGlobalTempView("emp2")
    
    image

    ② 在当前会话中执行查询,均可查询出结果。
    scala>spark.sql("select * from emp1").show
    scala>spark.sql("select * from global_temp.emp2").show

    image

    ③ 开启一个新的会话,执行同样的查询
    scala>spark.newSession.sql("select * from emp1").show (运行出错)
    scala>spark.newSession.sql("select * from global_temp.emp2").show

    image

    七、使用数据源

    1、通用的Load/Save函数

    (*)什么是parquet文件?
    Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:

    • 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
    • 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
    • 只读取需要的列,支持向量运算,能够获取更好的扫描性能。

    Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置

    (*)通用的Load/Save函数

    对比如下语句:

    scala>val peopleDF = spark.read.json("hdfs://bigdata111:9000/input/people.json")
    scala>val peopleDF = spark.read.format("json").load("hdfs://bigdata111:9000/input/people.json")

    查询Schema和数据:scala>userDF.show

    image
    • save函数保存数据,默认的文件格式:Parquet文件(列式存储文件)

    scala>userDF.select("name","favorite_color").write.save("/root/temp/result1")
    scala>userDF.select("name","favorite_color").write.format("csv").save("/root/temp/result2")
    scala>userDF.select("name","favorite_color").write.csv("/root/temp/result3")

    image image

    (*)显式指定文件格式:加载json格式
    直接加载:val usersDF = spark.read.load("/root/resources/people.json")
    会出错
    val usersDF = spark.read.format("json").load("/root/resources/people.json")

    (*)存储模式(Save Modes)
    可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下:
    默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:
    SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
    SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
    SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

    Demo:
    usersDF.select($"name").write.save("/root/result/parquet1")
    --> 出错:因为/root/result/parquet1已经存在

    usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")

    2 读写mysql

    2.1 JDBC

    Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

    2.1.1 从Mysql中加载数据库(Spark Shell 方式)

    启动Spark Shell,必须指定mysql连接驱动jar包

    spark-shell --master spark://hadoop1:7077 --jars mysql-connector-java-5.1.35-bin.jar --driver-class-path mysql-connector-java-5.1.35-bin.jar
    
    

    从mysql中加载数据

    val jdbcDF = sqlContext.read.format("jdbc").options(
         Map("url"->"jdbc:mysql://hadoop1:3306/bigdata",
                "driver"->"com.mysql.jdbc.Driver", 
                "dbtable"->"person", //  "dbtable"->"(select * from person where id = 12) as person", 
                "user"->"root",
                "password"->"123456")
         ).load()
    
    

    执行查询

    jdbcDF.show()
    
    2.1.2 将数据写入到MySQL中(打jar包方式)

    pom依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.neusoft</groupId>
      <artifactId>sparkdemo</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>sparkdemo</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.3.1</spark.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.11</artifactId>
          <version>1.6.2</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.8</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.8</version>
        </dependency>
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.47</version>
        </dependency>
    
      </dependencies>
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
    

    编写Spark SQL程序

    package com.neusoft
    import java.sql
    import java.sql.DriverManager
    import java.util.Date
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    /**
      * Created by Administrator on 2019/3/7.
      */
    object SparkDemo {
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
    
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        ssc.checkpoint("hdfs://hadoop3:8020/spark_check_point")
        //kafka的topic集合,即可以订阅多个topic,args传参的时候用,隔开
        val topicsSet = Set("ss_kafka")
        //设置kafka参数,定义brokers集合
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.159.133:9092,192.168.159.130:9092,192.168.159.134:9092")
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
        print("---------:" +messages)
        val lines = messages.map(_._2)
        lines.foreachRDD(rdd => {
          //内部函数
          def func(records: Iterator[String]) {
            var conn: sql.Connection = null
            var stmt: sql.PreparedStatement = null
            try {
              val url = "jdbc:mysql://localhost:3306/test"
              val user = "root"
              val password = "root"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
              conn = DriverManager.getConnection(url, user, password)
              records.foreach(p => {
                val arr = p.split("\\t")
                val phoneno = arr(0)
                val jingwei = arr(1)
                var arrjingwei = jingwei.split(",")
                //wei,jing
                var sql = "insert into location(time,latitude,longtitude) values (?,?,?)"
                stmt = conn.prepareStatement(sql);
                stmt.setLong(1, new Date().getTime)
                stmt.setDouble(2,java.lang.Double.parseDouble(arrjingwei(0).trim))
                stmt.setDouble(3,java.lang.Double.parseDouble(arrjingwei(1).trim))
                stmt.executeUpdate()
              })
            } catch {
              case e: Exception => e.printStackTrace()
            } finally {
              if (stmt != null) {
                stmt.close()
              }
              if (conn != null) {
                conn.close()
              }
            }
          }
          val repartitionedRDD = rdd.repartition(1)
          repartitionedRDD.foreachPartition(func)
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    

    用maven-shade-plugin插件将程序打包
    将jar包提交到spark集群

    spark-submit 
    --class cn.itcast.spark.sql.jdbcDF 
    --master spark://hadoop1:7077 
    --jars mysql-connector-java-5.1.35-bin.jar 
    --driver-class-path mysql-connector-java-5.1.35-bin.jar 
    /root/demo.jar
    

    相关文章

      网友评论

          本文标题:spark Sql

          本文链接:https://www.haomeiwen.com/subject/qpkgpqtx.html