实时统计注册人员信息
用户使用网站或APP进行注册,后台实时收集数据传输Kafka,Spark Streaming进行对接统计,实时统计注册人数。
- 需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。
- 需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据 提示:reduceByKeyAndWindow算子
- 需求3:观察对接数据,尝试进行调优。
package com.atguigu.reatime.registerCount.streaming
import java.lang
import java.sql.ResultSet
import java.util.Random
import com.atguigu.reatime.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RegisterStreaming {
private val groupid = "register_group_test"
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "atguigu")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "10") //每个分区最大拉去数据条数为10条/s,10条/s * 10个分区=100条/s
.set("spark.streaming.stopGracefullyOnShutdown", "true") //开启sparkStreaming优雅关闭
.setMaster("local[2]") //本地跑不能使用local[*]
val ssc = new StreamingContext(conf, Seconds(3)) //3秒一批
val topics = Array("register_topic") //消费kafka的topic
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
) //kafka参数:ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
ssc.checkpoint("hdfs://hadoop102:9000/user/atguigu/sparkstreaming/checkpoint") //设置检查点
//查询mysql中是否有偏移量
val sqlProxy = new SqlProxy() //关联类sqlProxy
//HashMap[(topic,partition), offset]
val offsetMap = new mutable.HashMap[TopicPartition, Long]() //HashMap[(topic,partition), offset]
val client = DataSourceUtil.getConnection //创建一个druid对象
try {
//通过druid对象连接mysql,执行sql文, groupid = register_group_test, 回调函数
sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
//重写回调函数
override def process(rs: ResultSet): Unit = {
while (rs.next()) { //ResultSet
//offset_manager: (groupid,topic, partition,untiloffset) (1,2,3,4)
//创建一个TopicPartition(topic, partition)
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4) //获取offset
offsetMap.put(model, offset) //放入HashMap[(topic,partition), offset]
}
rs.close() //关闭ResultSet
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client) //关闭druid连接对象
}
//设置kafka消费数据的参数 判断本地是否有偏移量 有则根据偏移量继续消费 无则重新消费
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
//offsetMap为空即:第一次消费该topic
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent, //大多数情况下使用,一致性的方式分配分区到所有 executor 上。(主要是为了分布均匀)
ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)) //发布订阅(消费那些topics,kafka参数)
} else {
KafkaUtils.createDirectStream(
// //offsetMap为空即:不是第一次消费该topic
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))//发布订阅(消费那些topics,kafka参数,偏移量)
}
//清洗从kafka消费到的register_topic的数据
//数据样例: 0 1 2019-07-16 16:01:55
// 用户id 平台id 1:PC 2:APP 3:Ohter 创建时间
val resultDStream = stream.filter(item => item.value().split("\t").length == 3).
mapPartitions(partitions => {
partitions.map(item => {
val line = item.value() //item为ConsumerRecord(String topic,int partition,long offset,K key,V value),取出value
val arr = line.split("\t") //即:0 1 2019-07-16 16:01:55
val app_name = arr(1) match {
case "1" => "PC"
case "2" => "APP"
case _ => "Other"
}
(app_name, 1) //如(PC,1)
})
})
resultDStream.cache() //缓存
//初步聚合
/**
* (Other,345)
* (APP,362)
* (PC,2893)
*/
//60秒的窗口大小,没6秒产生一批新RDD
resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()
//聚合历史数据
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum //本批次求和
val previousCount = state.getOrElse(0) //历史数据
Some(currentCount + previousCount)
}
resultDStream.updateStateByKey(updateFunc).print()
//调优,避免数据倾斜
val dsStream = stream.filter(item => item.value().split("\t").length == 3)
.mapPartitions(partitions =>
partitions.map(item => {
val rand = new Random()
val line = item.value()
val arr = line.split("\t")
val app_id = arr(1)
(rand.nextInt(3) + "_" + app_id, 1) //(随机数_appid,1)
}))
val result = dsStream.reduceByKey(_ + _) //(随机数_appid,51)
result.map(item => {
val appid = item._1.split("_")(1) //item 为 (随机数_appid,51), item._1为“随机数_appid”,item._1.split("_")(1)为app_id
(appid, item._2) //(appid, 51)
}).reduceByKey(_ + _).print()
//处理完 业务逻辑后 手动提交offset维护到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //拿到offset数据集
for (or <- offsetRanges) {
//刷新到mysql中
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
}
网友评论