美文网首页大数据
Spark订单量的实时统计项目

Spark订单量的实时统计项目

作者: 羋学僧 | 来源:发表于2020-10-26 20:34 被阅读0次

    Spark订单量的实时统计项目

    需求:

    1.各省份营业额的实时统计
    2.各省份订单量的实时统计

    数据:

    orderId                   provinceId   orderPrice
    201710261645320001,           12,          45.00
    

    第一步:编写Scala代码模拟KafkaProducer产生订单数据

    ConstantUtils.scala

    定义Kafka相关的集群配置信息 Producer Configs

    //存储常量数据
    object ConstantUtils {
      /**
       * 定义Kafka相关的集群配置信息
       */
      //kafka集群配置信息
      val METADATA_BROKER_LIST = "bigdata02:9092"
      //序列化类
      val SERIALIZER_CLASS = "kafka.serializer.StringEncoder"
      //发送数据的方式
      val PRODUCER_TYPE = "async"
      //Topic名称
      val ORDER_TOPIC = "test1026"
      //OFFSET
      val AUTO_OFFSET_RESET = "largest"
    }
    

    OrderProducer.scala

    import java.util.Properties
    
    import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
    
    
    object OrderProducer {
      def main(args: Array[String]): Unit = {
        //存储kafka集群相关的配置信息
        val props = new Properties()
        //创建生产者
        var producer: Producer[String, String] = null
    
        //kafka集群
       props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST)
        //设置向topic中存储数据的方式
        props.put("producer.type", ConstantUtils.PRODUCER_TYPE)
        //设置key和message的序列化类
        props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS)
        props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS)
    
        try{
    
          //生产者的配置信息
          val config = new ProducerConfig(props)
          //创建一个producer实例对象
          producer = new Producer[String, String](config)
          //发送数据
          val message = new KeyedMessage[String,String](ConstantUtils.ORDER_TOPIC,key = "201710261645320001",message = "201710261645320001,12,45.00")
          //发送数据
          producer.send(message)
    
        }catch {
          //打印出来
          case e: Exception => e.printStackTrace()
        }finally {
          //最后关闭producer
          if (null != producer) producer.close()
        }
      }
    }
    
    

    创建主题:

    bin/kafka-topics.sh --create --zookeeper bigdata02:2181 --replication-factor 1 --partitions 3 --topic test1026
    

    启动消费者

    cd /home/bigdata/apps/kafka_2.11-0.10.0.0/
    
    ./bin/kafka-console-consumer.sh  --bootstrap-server bigdata02:9092 --zookeeper  bigdata02:2181 --topic test1026 --from-beginning
    

    第二步:模拟产生Json格式订单数据批量发送到KafkaTopic

    OrderProducer.scala

    import java.util.{Properties, UUID}
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
    
    import scala.collection.mutable.ArrayBuffer
    
    //订单实体类
    case class Order(orderId: String, provinceId: Int, price: Float)
    
    object OrderProducer {
      def main(args: Array[String]): Unit = {
    
        /**
         * JackSon Object类
         */
        val mapper = new ObjectMapper()
        mapper.registerModule(DefaultScalaModule)
    
    
        //存储kafka集群相关的配置信息
        val props = new Properties()
        //创建生产者
        var producer: Producer[String, String] = null
    
        //kafka集群
       props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST)
        //设置向topic中存储数据的方式
        props.put("producer.type", ConstantUtils.PRODUCER_TYPE)
        //设置key和message的序列化类
        props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS)
        props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS)
    
        try{
    
          //生产者的配置信息
          val config = new ProducerConfig(props)
          //创建一个producer实例对象
          producer = new Producer[String, String](config)
          //创建一个存储Message的可变数组
          val messageArrayBuffer = new ArrayBuffer[KeyedMessage[String, String]]()
    
          //模拟一直产生N条订单数据
          while (true) {
    
            //清空一下数组中的message
            messageArrayBuffer.clear()
    
            //随机数用于确定每次随机产生订单的数目
            val random: Int = RandomUtils.getRandomNum(1000) + 100
    
            //生成订单数据
            for (index <- 0 until random) {
              //订单id
              val orderId = UUID.randomUUID().toString
              //省份id
              val provinceId = RandomUtils.getRandomNum(34) + 1
              //订单金额
              val orderPrice = RandomUtils.getRandomNum(80) + 100.5F
              //创建订单实例
              val order = Order(orderId, provinceId, orderPrice)
              //TODO:如何将实体类转换为Json格式
    
              //创建一个message
              val message = new KeyedMessage[String, String](ConstantUtils.ORDER_TOPIC, "orderId", mapper.writeValueAsString(order))
              //打印Json数据
               println(s"Order Json: ${mapper.writeValueAsString(order)}")
              //发送数据
              //producer.send(message)
              //向meaage数组里面增加message
              messageArrayBuffer += message
            }
            //批量发送到topic
            producer.send(messageArrayBuffer: _*)
            //休息一下
            Thread.sleep(RandomUtils.getRandomNum(100) * 10)
          }
        }catch {
          //打印出来
          case e: Exception => e.printStackTrace()
        }finally {
          //最后关闭producer
          if (null != producer) producer.close()
        }
      }
    }
    
    

    第三步:基于SCALA的贷出模式编写SparkStreaming编程模块

    OrderTurnoverStreaming.scala

    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}
    
    //从kafka获取数据,统计订单量和订单总值
    object OrderTurnoverStreaming {
    
      System.setProperty("HADOOP_USER_NAME", "bigdata")
    
      val BATCH_INTERVAL: Duration = Seconds(2)
    
      //检查点目录
      val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1027"
    
      //贷出函数
      def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
        //创建StreamingContext对象的函数
        def functionToCreateContext(): StreamingContext = {
    
          //创建配置对象
          val sparkConf = new SparkConf()
            //应用名称
            .setAppName("OrderTurnoverStreaming")
            //设置运行模式
            .setMaster("local[3]")
    
          //创建SparkContext上下文对象
          val sc = SparkContext.getOrCreate(sparkConf)
          //设置日志级别
          sc.setLogLevel("WARN")
          //创建ssc
          val ssc: StreamingContext =  new StreamingContext(sc, BATCH_INTERVAL)
          //调用用户函数
          operation(ssc)
          //确保交互式查询时候,数据不被删除
          ssc.remember(Minutes(1))
          //设置一下检查点目录
          ssc.checkpoint(CHECKPOINT_DIRECTORY)
          //返回ssc对象
          ssc
    
        }
    
        var context:StreamingContext = null
        try{
          //stop any esxiting StreamingContext
          val stopActiveContext = true
          if (stopActiveContext) {
            StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
          }
    
          // context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
          context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
    
          //设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
          context.sparkContext.setLogLevel("WARN")
          //start计算
          context.start()
          //wait 停止
          context.awaitTermination()
        }catch {
          case e: Exception => e.printStackTrace()
        } finally {
          context.stop(stopSparkContext = true,stopGracefully = true)
        }
    
      }
    
      def main(args: Array[String]): Unit = {
        sparkOperation(args)(processOrderData)
      }
    
      //用户函数   真正编写程序逻辑的地方
      def processOrderData(ssc: StreamingContext): Unit = {
    
      }
    
    }
    

    第四步:从Kafka的订单Topic读取及编程实现【实时累加统计各省份销售营业额】

    OrderTurnoverStreaming

    import java.text.SimpleDateFormat
    import java.util.Date
    
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}
    
    //从kafka获取数据,统计订单量和订单总值
    object OrderTurnoverStreaming {
    
      System.setProperty("HADOOP_USER_NAME", "bigdata")
    
      val BATCH_INTERVAL: Duration = Seconds(2)
    
      //检查点目录
      val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1027"
    
      //贷出函数
      def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
        //创建StreamingContext对象的函数
        def functionToCreateContext(): StreamingContext = {
    
          //创建配置对象
          val sparkConf = new SparkConf()
            //应用名称
            .setAppName("OrderTurnoverStreaming")
            //设置运行模式
            .setMaster("local[3]")
    
          //创建SparkContext上下文对象
          val sc = SparkContext.getOrCreate(sparkConf)
          //设置日志级别
          sc.setLogLevel("WARN")
          //创建ssc
          val ssc: StreamingContext =  new StreamingContext(sc, BATCH_INTERVAL)
          //调用用户函数
          operation(ssc)
          //确保交互式查询时候,数据不被删除
          ssc.remember(Minutes(1))
          //设置一下检查点目录
          ssc.checkpoint(CHECKPOINT_DIRECTORY)
          //返回ssc对象
          ssc
    
        }
    
        var context:StreamingContext = null
        try{
          //stop any esxiting StreamingContext
          val stopActiveContext = true
          if (stopActiveContext) {
            StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
          }
    
          // context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
          context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
    
          //设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
          context.sparkContext.setLogLevel("WARN")
          //start计算
          context.start()
          //wait 停止
          context.awaitTermination()
        }catch {
          case e: Exception => e.printStackTrace()
        } finally {
          context.stop(stopSparkContext = true,stopGracefully = true)
        }
    
      }
    
      def main(args: Array[String]): Unit = {
        sparkOperation(args)(processOrderData)
      }
    
      //用户函数   真正编写程序逻辑的地方
      def processOrderData(ssc: StreamingContext): Unit = {
        /**
         * 1.从Kafka topic 里面获取数据  direct模式
         */
    
        //设置kafka连接的相关信息(第一个参数)
        val kafkaParams: Map[String, String] = Map(
          "metadata.broker.list" -> ConstantUtils.METADATA_BROKER_LIST,
          "auto.offset.reset" -> ConstantUtils.AUTO_OFFSET_RESET
        )
        //Topics(第二参数)
        val topicsSet = ConstantUtils.ORDER_TOPIC.split(",").toSet
    
        //采用直接方式从kafka topic里面读取数据
        val orderDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
        //需求一开始
            /**
         * 2.处理订单数据  需求一:实时累加统计各省份的营业额
         */
    
        val orderTurnoverDStream = orderDStream
          //a.解析json格式数据,数据转换为DStream[(key,value)]
          .map(tuple => {
            //解析Json格式数据为Order
            val order = ObjectMapperSingleton.getInstance().readValue(tuple._2, classOf[Order])
            //返回
            (order.provinceId, order)
          }
          )
          //b.使用updateStateByKey 进行实时累加统计 --各省份
          .updateStateByKey(
            //updateFunc
            (orders: Seq[Order], state: Option[Float]) => {
              //获取当前省份传递进来的订单营业额
              val currentOrdersPrice = orders.map(_.price).sum
              //获取当前省份以前的营业额
              val previousPrice = state.getOrElse(0.0F)
              //累加
              Some(currentOrdersPrice + previousPrice)
            }
          )
        /**
         * 3.将各个省份实时统计的营业额进行输出
         */
        orderTurnoverDStream.foreachRDD(
          (rdd, time) => {
            //格式化时间
            val bacthInterval = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date(time.milliseconds))
            println("--------------------------------------")
            println(s"bacthInterval :${bacthInterval}")
            println("--------------------------------------")
            rdd.coalesce(1)
              .foreachPartition(iter =>{iter.foreach(println)})
          })
      }
    
    }
    
    //创建ObjectMapper单例对象
    object ObjectMapperSingleton {
      /**
       * 声明对象是使用transient 有如下含义
       * 1.当对象被序列化的时候,transient 阻止实例中哪些关键字声明的变量持久化
       * 2.当对象被反序列化的时候,实例变量不会被持久化和恢复
       */
    
      @transient private var instance: ObjectMapper = _
    
      def getInstance(): ObjectMapper = {
        if (instance == null) {
          instance = new ObjectMapper()
          instance.registerModule(DefaultScalaModule)
        }
        instance
      }
    
    
    }
    
    



    第五步:优化实时程序设置参数(处理条目数、序列化等)

      //检查点目录修改
      val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt1028"
    
          //TODO:设置从kafka topic 中每秒钟获取每个分区数据最多的条目数
          sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10000")
    
          //TODO:设置序列化方式
          sparkConf.set("spark.serializer "," org.apache.spark.serializer.KryoSerializer")
          sparkConf.registerKryoClasses(Array(classOf[Order]))
    

    第六步:集成SparkSQL分析基于窗口Window的每十秒的各个省份的订单量

    OrderQuantityStreaming.scala

      import java.text.SimpleDateFormat
      import java.util.Date
    
      import com.fasterxml.jackson.databind.ObjectMapper
      import com.fasterxml.jackson.module.scala.DefaultScalaModule
      import kafka.serializer.StringDecoder
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.streaming.kafka.KafkaUtils
      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}
    
      //从kafka获取数据,统计订单量和订单总值
      object OrderQuantityStreaming {
    
        System.setProperty("HADOOP_USER_NAME", "bigdata")
    
        val BATCH_INTERVAL: Duration = Seconds(2)
        val WINDOW_INTERVAL:Duration = BATCH_INTERVAL*5
        val SLIDER_INTERVAL:Duration = BATCH_INTERVAL*3
    
        //检查点目录
        val CHECKPOINT_DIRECTORY:String = "hdfs://bigdata02:9000/sparkckpt10282"
    
        //贷出函数
        def sparkOperation(args: Array[String])(operation: StreamingContext => Unit) = {
          //创建StreamingContext对象的函数
          def functionToCreateContext(): StreamingContext = {
    
            //创建配置对象
            val sparkConf = new SparkConf()
              //应用名称
              .setAppName("OrderTurnoverStreaming")
              //设置运行模式
              .setMaster("local[3]")
    
            //TODO:设置从kafka topic 中每秒钟获取每个分区数据最多的条目数
            sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10000")
    
            //TODO:设置序列化方式
            sparkConf.set("spark.serializer "," org.apache.spark.serializer.KryoSerializer")
            sparkConf.registerKryoClasses(Array(classOf[Order]))
    
            //创建SparkContext上下文对象
            val sc = SparkContext.getOrCreate(sparkConf)
            //设置日志级别
            sc.setLogLevel("WARN")
            //创建ssc
            val ssc: StreamingContext =  new StreamingContext(sc, BATCH_INTERVAL)
            //调用用户函数
            operation(ssc)
            //确保交互式查询时候,数据不被删除
            ssc.remember(Minutes(1))
            //设置一下检查点目录
            ssc.checkpoint(CHECKPOINT_DIRECTORY)
            //返回ssc对象
            ssc
    
          }
    
          var context:StreamingContext = null
          try{
            //stop any esxiting StreamingContext
            val stopActiveContext = true
            if (stopActiveContext) {
              StreamingContext.getActive().foreach(_.stop(stopSparkContext = true))
            }
    
            // context = StreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
            context = StreamingContext.getActiveOrCreate(CHECKPOINT_DIRECTORY, functionToCreateContext)
    
            //设置日志级别 ,如果从检查点恢复创建StreamingContext,日志级别不生效,就重新设置
            context.sparkContext.setLogLevel("WARN")
            //start计算
            context.start()
            //wait 停止
            context.awaitTermination()
          }catch {
            case e: Exception => e.printStackTrace()
          } finally {
            context.stop(stopSparkContext = true,stopGracefully = true)
          }
    
        }
    
        def main(args: Array[String]): Unit = {
          sparkOperation(args)(processOrderData)
        }
    
        //用户函数   真正编写程序逻辑的地方
        def processOrderData(ssc: StreamingContext): Unit = {
          /**
           * 1.从Kafka topic 里面获取数据  direct模式
           */
    
          //设置kafka连接的相关信息(第一个参数)
          val kafkaParams: Map[String, String] = Map(
            "metadata.broker.list" -> ConstantUtils.METADATA_BROKER_LIST,
            "auto.offset.reset" -> ConstantUtils.AUTO_OFFSET_RESET
          )
          //Topics(第二参数)
          val topicsSet = ConstantUtils.ORDER_TOPIC.split(",").toSet
    
          //采用直接方式从kafka topic里面读取数据
          val orderDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
          //需求二开始
          /**
           * 统计每10秒钟各个省份的订单量
           */
          orderDStream
            //设置窗口长度,滑动距离
            .window(WINDOW_INTERVAL,SLIDER_INTERVAL)
            .foreachRDD((rdd,time)=>{
              //格式化时间
              val sliderInterval = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date(time.milliseconds))
              println("--------------------------------------")
              println(s"sliderInterval :${sliderInterval}")
              println("--------------------------------------")
              //判断一下RDD是否有数据
              if(!rdd.isEmpty()){
                val orderRDD = rdd.map(tuple => {
                  //解析成Json格式数据
                  ObjectMapperSingleton.getInstance().readValue(tuple._2, classOf[Order])
                })
    
                //创建SparkSession对象
                val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
    
                //转换成DataFrame
                import spark.implicits._
                val orderDS = orderRDD.toDS()
    
                //使用DSL分析
                val provinceCountDF = orderDS.groupBy("provinceId").count()
    
                //打印
                provinceCountDF.show(15,truncate=false)
    
              }
            })
    
        }
    
      }
    
      //创建ObjectMapper单例对象
      object ObjectMapperSingleton {
        /**
         * 声明对象是使用transient 有如下含义
         * 1.当对象被序列化的时候,transient 阻止实例中哪些关键字声明的变量持久化
         * 2.当对象被反序列化的时候,实例变量不会被持久化和恢复
         */
    
        @transient private var instance: ObjectMapper = _
    
        def getInstance(): ObjectMapper = {
          if (instance == null) {
            instance = new ObjectMapper()
            instance.registerModule(DefaultScalaModule)
          }
          instance
        }
    
    
      }
    
      //创建Sparksession单例对象
      object SparkSessionSingleton {
        @transient private var instance: SparkSession = _
    
        def getInstance(sparkconf:SparkConf): SparkSession = {
          if (instance == null) {
            instance = SparkSession.builder().config(sparkconf).getOrCreate()
          }
          instance
        }
      }
    

    相关文章

      网友评论

        本文标题:Spark订单量的实时统计项目

        本文链接:https://www.haomeiwen.com/subject/nfszmktx.html