美文网首页
SparkStreaming通过双输入流动态配置业务

SparkStreaming通过双输入流动态配置业务

作者: LearningHam | 来源:发表于2019-09-30 14:22 被阅读0次

    SparkStreaming通过双输入流动态配置业务

    一、背景

    业务部门需要获得由数据部门实时采集的业务事件数据。当业务部门订阅指定事件时,则需要数据部门在业务部门订阅成功之后将业务部门所订阅的事件数据实时发送到业务部门。

    二、解决思路

    如何让流处理在执行过程中,既能在不重启服务的时候动态改变业务逻辑,又让流处理在运行过程中被通知成为了我们亟需解决的问题。在这里,我们通过双输入流的方式,一个是数据流,一个是控制流,数据流传入正常的业务数据进入流处理,控制流是当业务部门对订阅的事件发生变更时才会传入数据进入流处理,当控制流中有数据产生时则改变广播变量,通过动态改变广播变量的方式实现动态改变业务

    三、代码

    1、代码依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.2.3.1.0.0-78</spark.version>
        <hadoop.version>3.1.1.3.1.0.0-78</hadoop.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.45</version>
        </dependency>
    
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.38</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.0.0</version>
        </dependency>
    
      </dependencies>
    

    2、流处理代码

    object TestStreamingBroadcast extends BaseHolder{
    
      private val map = mutable.LinkedHashMap[String, Long]()
    
        def main(args: Array[String]): Unit = {
    
          println("StreamingService Start : "+TimeUtil.getNowTime )
    
          // 初始化广播变量
          var instance: Broadcast[Map[String,Long]] = null
          if (instance != null){
            instance.unpersist(true)
          }
          instance = sc.broadcast(getMysql)
          println("广播变量初始化成功 : " + TimeUtil.getNowTime)
    
          // 判断控制流是否有数据生成
          fileControlStream.foreachRDD(rdd => {
            if(rdd.collect().length>0){
              if (instance != null){
                instance.unpersist(true)
              }
              instance = sc.broadcast(getMysql)
              println("广播变量更新成功 : " + TimeUtil.getNowTime)
            }
          })
    
          // 数据流进行业务处理
          kafkaDataStream.foreachRDD {
            rdd => {
              rdd.foreachPartition(datas => {
                val kafkaProducer = KafkaUtil.getProducer
                datas.foreach(data => {
                  val jsonObj = JSON.parseObject(data.value())
                  val mapKey = jsonObj.getInteger("p_typ_cd")+"&"+jsonObj.getInteger("p_sub_typ_cd")+"&"+jsonObj.getString("event_id")
                  val map = instance.value
                  if(map.get(mapKey).isDefined){
                    val time = jsonObj.getLong("time")
                    val endTime = map.get(mapKey)
                    if(time < endTime.get){
                      kafkaProducer.send(new ProducerRecord[Int, String](activyTopic, Random.nextInt(9), data.value()))
                    }
                  }
                })
                kafkaProducer.close()
              })
            }
          }
          ssc.start()
          ssc.awaitTermination()
          ssc.stop()
      }
    
      /**
        * 获取订阅数据
        * @return
        */
      def getMysql: Map[String, Long] = {
        map.clear()
        val conn = MysqlUtil.getJdbcConnection
        val sql = querySql
        val ps = conn.prepareStatement(sql)
        val rs = ps.executeQuery()
        while(rs.next()){
          val p_typ_cd = rs.getInt("p_typ_cd")
          val p_sub_typ_cd = rs.getInt("p_sub_typ_cd")
          val event_id = rs.getString("event_id")
          val end_time = rs.getLong("end_time")
          map.put(p_typ_cd+"&"+p_sub_typ_cd+"&"+event_id,end_time)
        }
        map.toMap
      }
    }
    

    3、Spark初始化代码(TestConfig 为参数配置Trait)

    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 初始化SparkSteaming
      */
    class BaseHolder extends TestConfig {
    
      val conf: SparkConf = new SparkConf()
        .setAppName("test")
        .set("spark.streaming.stopGracefullyOnShutdown", "true")               //优雅的关闭
        .set("spark.streaming.backpressure.enabled", "true")                   //控制流速
        .set("spark.rdd.compress", "true")                                     // RDD压缩
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Kryo序列化
      val sc = new SparkContext(conf)
      sc.setLogLevel("WARN")
      val ssc = new StreamingContext(sc,Seconds(5))
      // 创建Kafka数据流
      val kafkaDataStream: InputDStream[ConsumerRecord[String, String]] = createDirectDataStream(ssc)
      // 创建File控制流
      val fileControlStream: DStream[String] = ssc.textFileStream(controlHdfsPath)
    
      /**
        * 创建Kafka数据流2.3.2)
        *
        * @param ssc StreamingContext
        * @return
        *
        * */
      def createDirectDataStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    
        val topics = Set[String](kafkaDataToic)
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> bootstrapServers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> consumerGroupId,
          "auto.offset.reset" -> offset, // lastest
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        KafkaUtils.createDirectStream(
          ssc,
          // 位置策略:如果Spark任务和Kafka部署在一起,会优先将读取数据的Task启动到数据所在机器上
          LocationStrategies.PreferConsistent,
          // 消费策略:可以指定从哪些topic消费(并且可安装topic名字的正则进行匹配)
          ConsumerStrategies.Subscribe(topics, kafkaParams)
        )
      }
    }
    
    

    4、MysqlUtil代码(TestConfig 为参数配置Trait)

    
    import java.sql
    import java.sql.DriverManager
    
    object MysqlUtil extends TestConfig{
    
      /**
        * 获取连接
        */
      def getJdbcConnection: sql.Connection = {
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        val jdbcUrl = mysqlJdbcUrl
        val conn = DriverManager.getConnection(jdbcUrl, mysqlUser, mysqlPwd)
        conn
      }
    
    }
    

    5、KafkaUtil代码

    /**
      * kafka工具类
      * @author zhaoyh
      **/
    object KafkaUtil extends TestConfig{
    
      /**
        * 获取Kafka生产者
        * @return kafkaProducer
        */
      def getProducer: KafkaProducer[Int, String] ={
        val props = new Properties()
        // 设置Kafka的服务
        props.put("bootstrap.servers", bootstrapServers)
        // 要求Kafka的leader在完成请求之前确认收到的数据条数,保证数据的持久性,只要有一个副本存活都不会丢数据
        props.put("acks", "all")
        props.put("retries", new Integer(0))
        props.put("batch.size", new Integer(16384))
        props.put("linger.ms", new Integer(1))
        props.put("buffer.memory", new Integer(33554432))
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[Int, String](props)
        producer
      }
    
    }
    

    四、结束语

    流处理实现动态改变配置现在还没有很多好的方法,如果有朋友有更好的解决方法可以私聊我。

    相关文章

      网友评论

          本文标题:SparkStreaming通过双输入流动态配置业务

          本文链接:https://www.haomeiwen.com/subject/hkacpctx.html