美文网首页python机器学习爬虫
sparkstreaming结合sparksql-2.x实时向h

sparkstreaming结合sparksql-2.x实时向h

作者: JasonLee实时计算 | 来源:发表于2019-02-20 13:05 被阅读108次

    今天主要来介绍一下SparkSql,2.x新版本操作hive的一个写法.

    Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据

    废话不多说,直接上代码:

    package spark
     
    import java.io.File
    import java.util
    import kafka.{PropertiesScalaUtils, RedisKeysListUtils}
    import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams}
    import net.sf.json.JSONObject
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
    import redis.RedisPool
     
    /**
      * 利用sparksql 2.0向hive中写数据;
      */
    object SparkSqlDemo {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
        Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)
        val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
        val spark = SparkSession.builder().appName("Spark SQL Jason").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
        spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "2000")
        spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        spark.conf.set("spark.streaming.concurrentJobs", "10")
        spark.conf.set("spark.streaming.kafka.maxRetries", "50")
        @transient
        val sc = spark.sparkContext
        val scc = new StreamingContext(sc, Seconds(2))
        val topic = "jason_20180511"
        val topicSet: Set[String] = Set(topic) //设置kafka的topic;
        val kafkaParams = Map[String, Object](
          "auto.offset.reset" -> "latest",
          "value.deserializer" -> classOf[StringDeserializer]
          , "key.deserializer" -> classOf[StringDeserializer]
          , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
          , "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
          , "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val maxTotal = 200
        val maxIdle = 100
        val minIdle = 10
        val testOnBorrow = false
        val testOnReturn = false
        val maxWaitMillis = 500
        RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis)
        val jedis = RedisPool.getPool.getResource
        jedis.select(dbIndex)
        val keys: util.Set[String] = jedis.keys(topic + "*")
        if (keys.size() == 0) {
          kafkaStreams = KafkaUtils.createDirectStream[String, String](
            scc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
        } else {
          val fromOffsets: Map[TopicPartition, Long] = RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, topic)
          kafkaStreams = KafkaUtils.createDirectStream[String, String](
            scc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, fromOffsets))
        }
        RedisPool.getPool.returnResource(jedis)
        kafkaStreams.foreachRDD(rdd=>{
          val jedis_jason = RedisPool.getPool.getResource
          jedis_jason.select(dbIndex)
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          import spark.sql
          if(!rdd.isEmpty()){
            val rowRDD:RDD[Row] = rdd.map(x=>{
              val json = JSONObject.fromObject(x.value().toString)
              val a = json.get("name")
              val b = json.get("addr")
              Row(a,b)
            })
            val schemaString = "name addr"
            val field = schemaString.split(" ").map(x=> StructField(x,StringType,nullable = true))
            val schema = StructType(field)
            val df = spark.createDataFrame(rowRDD, schema)
            df.show()
            df.createOrReplaceTempView("tempTable")
            val sq = "insert into test_2 select * from tempTable"
            sql(sq)
            println("插入hive成功了")
          }
          offsetRanges.foreach { offsetRange =>
            println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
            val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition
            jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "")
          }
        })
        scc.start()
        scc.awaitTermination()
      }
    }
    
    

    需要注意的是: spark.sql.warehouse.dir 配置的目录,该目录默认为Spark应用程序当前目录中的 spark-warehouse 目录 但请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。 而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。

    还有一个问题是,这样写的话,会在hdfs上这个表的目录下生成很多的小文件,这个时候如果想在hive中进行统计,计算的时候,会产生很多个map,严重影响计算的速度,大家可以先考虑下这个问题.

    为了解决在表目录下面生成很多小文件的问题,我们可以把hive表建成一个分区表,怎么建分区表在我的另一篇blog里面有写到,或者可以直接用: insert overwrite table combine_data partition (day_time='2018-08-01') select data,enter_time from combine_data where day_time = '2018-08-01';来合并小文件.

    或者也可把用reparation减少分区数,但是这么写,会减少rdd的并行度,降低性能,自己参考使用.

    如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢


    image.png

    相关文章

      网友评论

        本文标题:sparkstreaming结合sparksql-2.x实时向h

        本文链接:https://www.haomeiwen.com/subject/flxhyqtx.html