美文网首页
需求实现1

需求实现1

作者: yayooo | 来源:发表于2019-11-12 14:42 被阅读0次

实时统计注册人员信息
用户使用网站或APP进行注册,后台实时收集数据传输Kafka,Spark Streaming进行对接统计,实时统计注册人数。

  • 需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。
  • 需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据 提示:reduceByKeyAndWindow算子
  • 需求3:观察对接数据,尝试进行调优。
package com.atguigu.reatime.registerCount.streaming

import java.lang
import java.sql.ResultSet
import java.util.Random

import com.atguigu.reatime.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RegisterStreaming {
  private val groupid = "register_group_test"

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "atguigu")
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
      .set("spark.streaming.kafka.maxRatePerPartition", "10") //每个分区最大拉去数据条数为10条/s,10条/s * 10个分区=100条/s
      .set("spark.streaming.stopGracefullyOnShutdown", "true") //开启sparkStreaming优雅关闭
      .setMaster("local[2]")  //本地跑不能使用local[*]
    val ssc = new StreamingContext(conf, Seconds(3))  //3秒一批
    val topics = Array("register_topic")  //消费kafka的topic

    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )  //kafka参数:ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))


    ssc.checkpoint("hdfs://hadoop102:9000/user/atguigu/sparkstreaming/checkpoint")  //设置检查点

    //查询mysql中是否有偏移量
    val sqlProxy = new SqlProxy() //关联类sqlProxy

    //HashMap[(topic,partition), offset]
    val offsetMap = new mutable.HashMap[TopicPartition, Long]() //HashMap[(topic,partition), offset]

    val client = DataSourceUtil.getConnection  //创建一个druid对象

    try {
      //通过druid对象连接mysql,执行sql文, groupid = register_group_test, 回调函数
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {

        //重写回调函数
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {  //ResultSet
            //offset_manager: (groupid,topic, partition,untiloffset) (1,2,3,4)
            //创建一个TopicPartition(topic, partition)
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)  //获取offset
            offsetMap.put(model, offset)   //放入HashMap[(topic,partition), offset]
          }
          rs.close() //关闭ResultSet
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)  //关闭druid连接对象
    }
    //设置kafka消费数据的参数  判断本地是否有偏移量  有则根据偏移量继续消费 无则重新消费
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      //offsetMap为空即:第一次消费该topic
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,  //大多数情况下使用,一致性的方式分配分区到所有 executor 上。(主要是为了分布均匀)
        ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)) //发布订阅(消费那些topics,kafka参数)
    } else {
      KafkaUtils.createDirectStream(
        // //offsetMap为空即:不是第一次消费该topic
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))//发布订阅(消费那些topics,kafka参数,偏移量)
    }
    //清洗从kafka消费到的register_topic的数据
    //数据样例: 0             1                                  2019-07-16 16:01:55
    //          用户id     平台id  1:PC  2:APP   3:Ohter       创建时间
    val resultDStream = stream.filter(item => item.value().split("\t").length == 3).
      mapPartitions(partitions => {
        partitions.map(item => {
          val line = item.value()   //item为ConsumerRecord(String topic,int partition,long offset,K key,V value),取出value
          val arr = line.split("\t")  //即:0     1      2019-07-16 16:01:55
          val app_name = arr(1) match {
            case "1" => "PC"
            case "2" => "APP"
            case _ => "Other"
          }
          (app_name, 1)  //如(PC,1)
        })
      })

    resultDStream.cache()  //缓存

    //初步聚合
    /**
      * (Other,345)
      * (APP,362)
      * (PC,2893)
      */
    //60秒的窗口大小,没6秒产生一批新RDD
    resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()

    //聚合历史数据
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum //本批次求和
      val previousCount = state.getOrElse(0) //历史数据
      Some(currentCount + previousCount)
    }
    resultDStream.updateStateByKey(updateFunc).print()

    //调优,避免数据倾斜
        val dsStream = stream.filter(item => item.value().split("\t").length == 3)
          .mapPartitions(partitions =>
            partitions.map(item => {
              val rand = new Random()
              val line = item.value()
              val arr = line.split("\t")
              val app_id = arr(1)
              (rand.nextInt(3) + "_" + app_id, 1)  //(随机数_appid,1)
            }))
        val result = dsStream.reduceByKey(_ + _)   //(随机数_appid,51)
        result.map(item => {
          val appid = item._1.split("_")(1)  //item 为 (随机数_appid,51), item._1为“随机数_appid”,item._1.split("_")(1)为app_id
          (appid, item._2)   //(appid, 51)
        }).reduceByKey(_ + _).print()

    //处理完 业务逻辑后 手动提交offset维护到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  //拿到offset数据集
        for (or <- offsetRanges) {
          //刷新到mysql中
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

}


相关文章

网友评论

      本文标题:需求实现1

      本文链接:https://www.haomeiwen.com/subject/izglyctx.html