美文网首页
Spark 与hive集成 并且读取mysql数据

Spark 与hive集成 并且读取mysql数据

作者: wudl | 来源:发表于2021-07-08 22:14 被阅读0次

    1.Spark 和hive de 集成

    1. 构建SparkSessiond对象
    2. 与hive 集成的配置
    3. 
    

    2.maven 的依赖环境配置

       <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive-thriftserver_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    

    3. 代码集成

    package cn.wudl.tags.models.rule
    
    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object JobModel {
    
      def main(args: Array[String]): Unit = {
    
    
         // 创建sparkSession 的实例对象
        val spark :SparkSession = {
          //  创建sparkSession
        val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
            // 设置shuffle 的分区数目
          .set("spark.sql.shuffle.partitions", "4")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result], classOf[Put]))
    
        // 2. 用构造者模式 构建SparkSession
        val session = SparkSession.builder().config(sparkConf)
          //与hive 集成
          .enableHiveSupport()
          // 设置与hive 集成
          .config("hive.metastore.uris", "thrift://192.168.1.140:9083")
          // 设置hive 的数仓目录
          .config("spark.sql.warehouse.dir", "hdfs://192.168.1.140:8020/user/hive/warehouse")
          .getOrCreate()
        // c. 返回会话对象
          session
        }
        import org.apache.spark.sql.functions._
        import spark.implicits._
    
        val tagTable:String =
          """
            |(
            |SELECT id, name, rule, level  FROM profile_tags.tbl_basic_tag WHERE id = 321
            |union
            |SELECT id, name, rule, level  FROM profile_tags.tbl_basic_tag WHERE pid = 321
            |) as tag_table
            |""".stripMargin
    
        val baseTagDF = spark.read.format("jdbc")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("url", "jdbc:mysql://192.168.1.140:3306/?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC")
          .option("dbtable", tagTable)
          .option("user", "root")
          .option("password", "123456")
          .load()
          //  打印df 的数据格式
        baseTagDF.printSchema();
        baseTagDF.show(1000,truncate = false)
        spark.stop()
      }
    
    }
    
    

    4. 执行结果

    21/07/08 21:58:41 INFO SessionState: Created HDFS directory: /tmp/hive/Administrator/2dc03761-0ead-4df3-a025-7548bde33ca3/_tmp_space.db
    21/07/08 21:58:41 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is hdfs://192.168.1.140:8020/user/hive/warehouse
    21/07/08 21:58:41 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    root
     |-- id: long (nullable = false)
     |-- name: string (nullable = true)
     |-- rule: string (nullable = true)
     |-- level: integer (nullable = true)
    
    21/07/08 21:58:42 INFO CodeGenerator: Code generated in 129.7142 ms
    21/07/08 21:58:42 INFO SparkContext: Starting job: show at JobModel.scala:54
    21/07/08 21:58:42 INFO DAGScheduler: Got job 0 (show at JobModel.scala:54) with 1 output partitions
    21/07/08 21:58:42 INFO DAGScheduler: Final stage: ResultStage 0 (show at JobModel.scala:54)
    21/07/08 21:58:42 INFO DAGScheduler: Parents of final stage: List()
    21/07/08 21:58:42 INFO DAGScheduler: Missing parents: List()
    21/07/08 21:58:42 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at show at JobModel.scala:54), which has no missing parents
    21/07/08 21:58:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    21/07/08 21:58:43 INFO JDBCRDD: closed connection
    21/07/08 21:58:43 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1475 bytes result sent to driver
    21/07/08 21:58:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 97 ms on localhost (executor driver) (1/1)
    21/07/08 21:58:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    21/07/08 21:58:43 INFO DAGScheduler: ResultStage 0 (show at JobModel.scala:54) finished in 0.110 s
    21/07/08 21:58:43 INFO DAGScheduler: Job 0 finished: show at JobModel.scala:54, took 0.380809 s
    21/07/08 21:58:43 INFO CodeGenerator: Code generated in 11.4216 ms
    +---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
    |id |name|rule                                                                                                                   |level|
    +---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
    |321|职业  |inType=hbase
    zkHosts=192.168.1.140
    zkPort=2181
    hbaseTable=tbl_tag_users
    family=detail
    selectFieldNames=id,job|4    |
    |322|学生  |1                                                                                                                      |5    |
    |323|公务员 |2                                                                                                                      |5    |
    |324|军人  |3                                                                                                                      |5    |
    |325|警察  |4                                                                                                                      |5    |
    |326|教师  |5                                                                                                                      |5    |
    |327|白领  |6                                                                                                                      |5    |
    +---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
    
    21/07/08 21:58:43 INFO SparkUI: Stopped Spark web UI at http://192.168.1.1:4040
    21/07/08 21:58:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    21/07/08 21:58:43 INFO MemoryStore: MemoryStore cleared
    21/07/08 21:58:43 INFO BlockManager: BlockManager stopped
    21/07/08 21:58:43 INFO BlockManagerMaster: BlockManagerMaster stopped
    21/07/08 21:58:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    21/07/08 21:58:43 INFO SparkContext: Successfully stopped SparkContext
    21/07/08 21:58:43 INFO ShutdownHookManager: Shutdown hook called
    21/07/08 21:58:43 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-ecfaf63a-8cfb-4c16-a300-ae47c03ca5b7
    Disconnected from the target VM, address: '127.0.0.1:50966', transport: 'socket'
    
    
    

    相关文章

      网友评论

          本文标题:Spark 与hive集成 并且读取mysql数据

          本文链接:https://www.haomeiwen.com/subject/lwqxpltx.html