Spark订单量的实时统计项目
需求:
1.各省份营业额的实时统计
2.各省份订单量的实时统计
数据:
orderId provinceId orderPrice
201710261645320001, 12, 45.00
第一步:编写Scala代码模拟KafkaProducer产生订单数据
ConstantUtils.scala
定义Kafka相关的集群配置信息 Producer Configs

//存储常量数据
object ConstantUtils {
/**
* 定义Kafka相关的集群配置信息
*/
//kafka集群配置信息
val METADATA_BROKER_LIST = "bigdata02:9092"
//序列化类
val SERIALIZER_CLASS = "kafka.serializer.StringEncoder"
//发送数据的方式
val PRODUCER_TYPE = "async"
//Topic名称
val ORDER_TOPIC = "test1026"
//OFFSET
val AUTO_OFFSET_RESET = "largest"
}
OrderProducer.scala
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
object OrderProducer {
def main(args: Array[String]): Unit = {
//存储kafka集群相关的配置信息
val props = new Properties()
//创建生产者
var producer: Producer[String, String] = null
//kafka集群
props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST)
//设置向topic中存储数据的方式
props.put("producer.type", ConstantUtils.PRODUCER_TYPE)
//设置key和message的序列化类
props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS)
props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS)
try{
//生产者的配置信息
val config = new ProducerConfig(props)
//创建一个producer实例对象
producer = new Producer[String, String](config)
//发送数据
val message = new KeyedMessage[String,String](ConstantUtils.ORDER_TOPIC,key = "201710261645320001",message = "201710261645320001,12,45.00")
//发送数据
producer.send(message)
}catch {
//打印出来
case e: Exception => e.printStackTrace()
}finally {
//最后关闭producer
if (null != producer) producer.close()
}
}
}
创建主题:
bin/kafka-topics.sh --create --zookeeper bigdata02:2181 --replication-factor 1 --partitions 3 --topic test1026
启动消费者
cd /home/bigdata/apps/kafka_2.11-0.10.0.0/
./bin/kafka-console-consumer.sh --bootstrap-server bigdata02:9092 --zookeeper bigdata02:2181 --topic test1026 --from-beginning

第二步:模拟产生Json格式订单数据批量发送到KafkaTopic
OrderProducer.scala
import java.util.{Properties, UUID}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import scala.collection.mutable.ArrayBuffer
//订单实体类
case class Order(orderId: String, provinceId: Int, price: Float)
object OrderProducer {
def main(args: Array[String]): Unit = {
/**
* JackSon Object类
*/
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
//存储kafka集群相关的配置信息
val props = new Properties()
//创建生产者
var producer: Producer[String, String] = null
//kafka集群
props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST)
//设置向topic中存储数据的方式
props.put("producer.type", ConstantUtils.PRODUCER_TYPE)
//设置key和message的序列化类
props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS)
props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS)
try{
//生产者的配置信息
val config = new ProducerConfig(props)
//创建一个producer实例对象
producer = new Producer[String, String](config)
//创建一个存储Message的可变数组
val messageArrayBuffer = new ArrayBuffer[KeyedMessage[String, String]]()
//模拟一直产生N条订单数据
while (true) {
//清空一下数组中的message
messageArrayBuffer.clear()
//随机数用于确定每次随机产生订单的数目
val random: Int = RandomUtils.getRandomNum(1000) + 100
//生成订单数据
for (index <- 0 until random) {
//订单id
val orderId = UUID.randomUUID().toString
//省份id
val provinceId = RandomUtils.getRandomNum(34) + 1
//订单金额
val orderPrice = RandomUtils.getRandomNum(80) + 100.5F
//创建订单实例
val order = Order(orderId, provinceId, orderPrice)
//TODO:如何将实体类转换为Json格式
//创建一个message
val message = new KeyedMessage[String, String](ConstantUtils.ORDER_TOPIC, "orderId", mapper.writeValueAsString(order))
//打印Json数据
println(s"Order Json: ${mapper.writeValueAsString(order)}")
//发送数据
//producer.send(message)
//向meaage数组里面增加message
messageArrayBuffer += message
}
//批量发送到topic
producer.send(messageArrayBuffer: _*)
//休息一下
Thread.sleep(RandomUtils.getRandomNum(100) * 10)
}
}catch {
//打印出来
case e: Exception => e.printStackTrace()
}finally {
//最后关闭producer
if (null != producer) producer.close()
}
}
}


第三步:基于SCALA的贷出模式编写SparkStreaming编程模块
OrderTurnoverStreaming.scala
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}
//从kafka获取数据,统计订单量和订单总值
object OrderTurnoverStreaming {
System.setProperty("HADOOP_USER_NAME", "bigdata")
val BATCH_INTERVAL: Duration = Seconds(2)
//检查点目录
val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1027"
//贷出函数
def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
//创建StreamingContext对象的函数
def functionToCreateContext(): StreamingContext = {
//创建配置对象
val sparkConf = new SparkConf()
//应用名称
.setAppName("OrderTurnoverStreaming")
//设置运行模式
.setMaster("local[3]")
//创建SparkContext上下文对象
val sc = SparkContext.getOrCreate(sparkConf)
//设置日志级别
sc.setLogLevel("WARN")
//创建ssc
val ssc: StreamingContext = new StreamingContext(sc, BATCH_INTERVAL)
//调用用户函数
operation(ssc)
//确保交互式查询时候,数据不被删除
ssc.remember(Minutes(1))
//设置一下检查点目录
ssc.checkpoint(CHECKPOINT_DIRECTORY)
//返回ssc对象
ssc
}
var context:StreamingContext = null
try{
//stop any esxiting StreamingContext
val stopActiveContext = true
if (stopActiveContext) {
StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
}
// context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
//设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
context.sparkContext.setLogLevel("WARN")
//start计算
context.start()
//wait 停止
context.awaitTermination()
}catch {
case e: Exception => e.printStackTrace()
} finally {
context.stop(stopSparkContext = true,stopGracefully = true)
}
}
def main(args: Array[String]): Unit = {
sparkOperation(args)(processOrderData)
}
//用户函数 真正编写程序逻辑的地方
def processOrderData(ssc: StreamingContext): Unit = {
}
}
第四步:从Kafka的订单Topic读取及编程实现【实时累加统计各省份销售营业额】
OrderTurnoverStreaming
import java.text.SimpleDateFormat
import java.util.Date
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}
//从kafka获取数据,统计订单量和订单总值
object OrderTurnoverStreaming {
System.setProperty("HADOOP_USER_NAME", "bigdata")
val BATCH_INTERVAL: Duration = Seconds(2)
//检查点目录
val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1027"
//贷出函数
def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
//创建StreamingContext对象的函数
def functionToCreateContext(): StreamingContext = {
//创建配置对象
val sparkConf = new SparkConf()
//应用名称
.setAppName("OrderTurnoverStreaming")
//设置运行模式
.setMaster("local[3]")
//创建SparkContext上下文对象
val sc = SparkContext.getOrCreate(sparkConf)
//设置日志级别
sc.setLogLevel("WARN")
//创建ssc
val ssc: StreamingContext = new StreamingContext(sc, BATCH_INTERVAL)
//调用用户函数
operation(ssc)
//确保交互式查询时候,数据不被删除
ssc.remember(Minutes(1))
//设置一下检查点目录
ssc.checkpoint(CHECKPOINT_DIRECTORY)
//返回ssc对象
ssc
}
var context:StreamingContext = null
try{
//stop any esxiting StreamingContext
val stopActiveContext = true
if (stopActiveContext) {
StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
}
// context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
//设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
context.sparkContext.setLogLevel("WARN")
//start计算
context.start()
//wait 停止
context.awaitTermination()
}catch {
case e: Exception => e.printStackTrace()
} finally {
context.stop(stopSparkContext = true,stopGracefully = true)
}
}
def main(args: Array[String]): Unit = {
sparkOperation(args)(processOrderData)
}
//用户函数 真正编写程序逻辑的地方
def processOrderData(ssc: StreamingContext): Unit = {
/**
* 1.从Kafka topic 里面获取数据 direct模式
*/
//设置kafka连接的相关信息(第一个参数)
val kafkaParams: Map[String, String] = Map(
"metadata.broker.list" -> ConstantUtils.METADATA_BROKER_LIST,
"auto.offset.reset" -> ConstantUtils.AUTO_OFFSET_RESET
)
//Topics(第二参数)
val topicsSet = ConstantUtils.ORDER_TOPIC.split(",").toSet
//采用直接方式从kafka topic里面读取数据
val orderDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
//需求一开始
/**
* 2.处理订单数据 需求一:实时累加统计各省份的营业额
*/
val orderTurnoverDStream = orderDStream
//a.解析json格式数据,数据转换为DStream[(key,value)]
.map(tuple => {
//解析Json格式数据为Order
val order = ObjectMapperSingleton.getInstance().readValue(tuple._2, classOf[Order])
//返回
(order.provinceId, order)
}
)
//b.使用updateStateByKey 进行实时累加统计 --各省份
.updateStateByKey(
//updateFunc
(orders: Seq[Order], state: Option[Float]) => {
//获取当前省份传递进来的订单营业额
val currentOrdersPrice = orders.map(_.price).sum
//获取当前省份以前的营业额
val previousPrice = state.getOrElse(0.0F)
//累加
Some(currentOrdersPrice + previousPrice)
}
)
/**
* 3.将各个省份实时统计的营业额进行输出
*/
orderTurnoverDStream.foreachRDD(
(rdd, time) => {
//格式化时间
val bacthInterval = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(time.milliseconds))
println("--------------------------------------")
println(s"bacthInterval :${bacthInterval}")
println("--------------------------------------")
rdd.coalesce(1)
.foreachPartition(iter =>{iter.foreach(println)})
})
}
}
//创建ObjectMapper单例对象
object ObjectMapperSingleton {
/**
* 声明对象是使用transient 有如下含义
* 1.当对象被序列化的时候,transient 阻止实例中哪些关键字声明的变量持久化
* 2.当对象被反序列化的时候,实例变量不会被持久化和恢复
*/
@transient private var instance: ObjectMapper = _
def getInstance(): ObjectMapper = {
if (instance == null) {
instance = new ObjectMapper()
instance.registerModule(DefaultScalaModule)
}
instance
}
}




第五步:优化实时程序设置参数(处理条目数、序列化等)
//检查点目录修改
val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1028"
//TODO:设置从kafka topic 中每秒钟获取每个分区数据最多的条目数
sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10000")
//TODO:设置序列化方式
sparkConf.set("spark.serializer "," org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Order]))

第六步:集成SparkSQL分析基于窗口Window的每十秒的各个省份的订单量
OrderQuantityStreaming.scala
import java.text.SimpleDateFormat
import java.util.Date
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}
//从kafka获取数据,统计订单量和订单总值
object OrderQuantityStreaming {
System.setProperty("HADOOP_USER_NAME", "bigdata")
val BATCH_INTERVAL: Duration = Seconds(2)
val WINDOW_INTERVAL:Duration = BATCH_INTERVAL*5
val SLIDER_INTERVAL:Duration = BATCH_INTERVAL*3
//检查点目录
val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt10282"
//贷出函数
def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
//创建StreamingContext对象的函数
def functionToCreateContext(): StreamingContext = {
//创建配置对象
val sparkConf = new SparkConf()
//应用名称
.setAppName("OrderTurnoverStreaming")
//设置运行模式
.setMaster("local[3]")
//TODO:设置从kafka topic 中每秒钟获取每个分区数据最多的条目数
sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10000")
//TODO:设置序列化方式
sparkConf.set("spark.serializer "," org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Order]))
//创建SparkContext上下文对象
val sc = SparkContext.getOrCreate(sparkConf)
//设置日志级别
sc.setLogLevel("WARN")
//创建ssc
val ssc: StreamingContext = new StreamingContext(sc, BATCH_INTERVAL)
//调用用户函数
operation(ssc)
//确保交互式查询时候,数据不被删除
ssc.remember(Minutes(1))
//设置一下检查点目录
ssc.checkpoint(CHECKPOINT_DIRECTORY)
//返回ssc对象
ssc
}
var context:StreamingContext = null
try{
//stop any esxiting StreamingContext
val stopActiveContext = true
if (stopActiveContext) {
StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
}
// context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
//设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
context.sparkContext.setLogLevel("WARN")
//start计算
context.start()
//wait 停止
context.awaitTermination()
}catch {
case e: Exception => e.printStackTrace()
} finally {
context.stop(stopSparkContext = true,stopGracefully = true)
}
}
def main(args: Array[String]): Unit = {
sparkOperation(args)(processOrderData)
}
//用户函数 真正编写程序逻辑的地方
def processOrderData(ssc: StreamingContext): Unit = {
/**
* 1.从Kafka topic 里面获取数据 direct模式
*/
//设置kafka连接的相关信息(第一个参数)
val kafkaParams: Map[String, String] = Map(
"metadata.broker.list" -> ConstantUtils.METADATA_BROKER_LIST,
"auto.offset.reset" -> ConstantUtils.AUTO_OFFSET_RESET
)
//Topics(第二参数)
val topicsSet = ConstantUtils.ORDER_TOPIC.split(",").toSet
//采用直接方式从kafka topic里面读取数据
val orderDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
//需求二开始
/**
* 统计每10秒钟各个省份的订单量
*/
orderDStream
//设置窗口长度,滑动距离
.window(WINDOW_INTERVAL,SLIDER_INTERVAL)
.foreachRDD((rdd,time)=>{
//格式化时间
val sliderInterval = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(time.milliseconds))
println("--------------------------------------")
println(s"sliderInterval :${sliderInterval}")
println("--------------------------------------")
//判断一下RDD是否有数据
if(!rdd.isEmpty()){
val orderRDD = rdd.map(tuple => {
//解析成Json格式数据
ObjectMapperSingleton.getInstance().readValue(tuple._2, classOf[Order])
})
//创建SparkSession对象
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
//转换成DataFrame
import spark.implicits._
val orderDS = orderRDD.toDS()
//使用DSL分析
val provinceCountDF = orderDS.groupBy("provinceId").count()
//打印
provinceCountDF.show(15,truncate=false)
}
})
}
}
//创建ObjectMapper单例对象
object ObjectMapperSingleton {
/**
* 声明对象是使用transient 有如下含义
* 1.当对象被序列化的时候,transient 阻止实例中哪些关键字声明的变量持久化
* 2.当对象被反序列化的时候,实例变量不会被持久化和恢复
*/
@transient private var instance: ObjectMapper = _
def getInstance(): ObjectMapper = {
if (instance == null) {
instance = new ObjectMapper()
instance.registerModule(DefaultScalaModule)
}
instance
}
}
//创建Sparksession单例对象
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkconf:SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession.builder().config(sparkconf).getOrCreate()
}
instance
}
}

网友评论