美文网首页
需求实现1

需求实现1

作者: yayooo | 来源:发表于2019-11-12 14:42 被阅读0次

    实时统计注册人员信息
    用户使用网站或APP进行注册,后台实时收集数据传输Kafka,Spark Streaming进行对接统计,实时统计注册人数。

    • 需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。
    • 需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据 提示:reduceByKeyAndWindow算子
    • 需求3:观察对接数据,尝试进行调优。
    package com.atguigu.reatime.registerCount.streaming
    
    import java.lang
    import java.sql.ResultSet
    import java.util.Random
    
    import com.atguigu.reatime.util.{DataSourceUtil, QueryCallback, SqlProxy}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    object RegisterStreaming {
      private val groupid = "register_group_test"
    
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "atguigu")
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
          .set("spark.streaming.kafka.maxRatePerPartition", "10") //每个分区最大拉去数据条数为10条/s,10条/s * 10个分区=100条/s
          .set("spark.streaming.stopGracefullyOnShutdown", "true") //开启sparkStreaming优雅关闭
          .setMaster("local[2]")  //本地跑不能使用local[*]
        val ssc = new StreamingContext(conf, Seconds(3))  //3秒一批
        val topics = Array("register_topic")  //消费kafka的topic
    
        val kafkaMap: Map[String, Object] = Map[String, Object](
          "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> groupid,
          "auto.offset.reset" -> "earliest",
          "enable.auto.commit" -> (false: lang.Boolean)
        )  //kafka参数:ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    
    
        ssc.checkpoint("hdfs://hadoop102:9000/user/atguigu/sparkstreaming/checkpoint")  //设置检查点
    
        //查询mysql中是否有偏移量
        val sqlProxy = new SqlProxy() //关联类sqlProxy
    
        //HashMap[(topic,partition), offset]
        val offsetMap = new mutable.HashMap[TopicPartition, Long]() //HashMap[(topic,partition), offset]
    
        val client = DataSourceUtil.getConnection  //创建一个druid对象
    
        try {
          //通过druid对象连接mysql,执行sql文, groupid = register_group_test, 回调函数
          sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
    
            //重写回调函数
            override def process(rs: ResultSet): Unit = {
              while (rs.next()) {  //ResultSet
                //offset_manager: (groupid,topic, partition,untiloffset) (1,2,3,4)
                //创建一个TopicPartition(topic, partition)
                val model = new TopicPartition(rs.getString(2), rs.getInt(3))
                val offset = rs.getLong(4)  //获取offset
                offsetMap.put(model, offset)   //放入HashMap[(topic,partition), offset]
              }
              rs.close() //关闭ResultSet
            }
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)  //关闭druid连接对象
        }
        //设置kafka消费数据的参数  判断本地是否有偏移量  有则根据偏移量继续消费 无则重新消费
        val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
          //offsetMap为空即:第一次消费该topic
          KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent,  //大多数情况下使用,一致性的方式分配分区到所有 executor 上。(主要是为了分布均匀)
            ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)) //发布订阅(消费那些topics,kafka参数)
        } else {
          KafkaUtils.createDirectStream(
            // //offsetMap为空即:不是第一次消费该topic
            ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))//发布订阅(消费那些topics,kafka参数,偏移量)
        }
        //清洗从kafka消费到的register_topic的数据
        //数据样例: 0             1                                  2019-07-16 16:01:55
        //          用户id     平台id  1:PC  2:APP   3:Ohter       创建时间
        val resultDStream = stream.filter(item => item.value().split("\t").length == 3).
          mapPartitions(partitions => {
            partitions.map(item => {
              val line = item.value()   //item为ConsumerRecord(String topic,int partition,long offset,K key,V value),取出value
              val arr = line.split("\t")  //即:0     1      2019-07-16 16:01:55
              val app_name = arr(1) match {
                case "1" => "PC"
                case "2" => "APP"
                case _ => "Other"
              }
              (app_name, 1)  //如(PC,1)
            })
          })
    
        resultDStream.cache()  //缓存
    
        //初步聚合
        /**
          * (Other,345)
          * (APP,362)
          * (PC,2893)
          */
        //60秒的窗口大小,没6秒产生一批新RDD
        resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()
    
        //聚合历史数据
        val updateFunc = (values: Seq[Int], state: Option[Int]) => {
          val currentCount = values.sum //本批次求和
          val previousCount = state.getOrElse(0) //历史数据
          Some(currentCount + previousCount)
        }
        resultDStream.updateStateByKey(updateFunc).print()
    
        //调优,避免数据倾斜
            val dsStream = stream.filter(item => item.value().split("\t").length == 3)
              .mapPartitions(partitions =>
                partitions.map(item => {
                  val rand = new Random()
                  val line = item.value()
                  val arr = line.split("\t")
                  val app_id = arr(1)
                  (rand.nextInt(3) + "_" + app_id, 1)  //(随机数_appid,1)
                }))
            val result = dsStream.reduceByKey(_ + _)   //(随机数_appid,51)
            result.map(item => {
              val appid = item._1.split("_")(1)  //item 为 (随机数_appid,51), item._1为“随机数_appid”,item._1.split("_")(1)为app_id
              (appid, item._2)   //(appid, 51)
            }).reduceByKey(_ + _).print()
    
        //处理完 业务逻辑后 手动提交offset维护到本地 mysql中
        stream.foreachRDD(rdd => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  //拿到offset数据集
            for (or <- offsetRanges) {
              //刷新到mysql中
              sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
                Array(groupid, or.topic, or.partition.toString, or.untilOffset))
            }
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
            sqlProxy.shutdown(client)
          }
        })
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    
    
    

    相关文章

      网友评论

          本文标题:需求实现1

          本文链接:https://www.haomeiwen.com/subject/izglyctx.html