美文网首页
SparkStreaming通过双输入流动态配置业务

SparkStreaming通过双输入流动态配置业务

作者: LearningHam | 来源:发表于2019-09-30 14:22 被阅读0次

SparkStreaming通过双输入流动态配置业务

一、背景

业务部门需要获得由数据部门实时采集的业务事件数据。当业务部门订阅指定事件时,则需要数据部门在业务部门订阅成功之后将业务部门所订阅的事件数据实时发送到业务部门。

二、解决思路

如何让流处理在执行过程中,既能在不重启服务的时候动态改变业务逻辑,又让流处理在运行过程中被通知成为了我们亟需解决的问题。在这里,我们通过双输入流的方式,一个是数据流,一个是控制流,数据流传入正常的业务数据进入流处理,控制流是当业务部门对订阅的事件发生变更时才会传入数据进入流处理,当控制流中有数据产生时则改变广播变量,通过动态改变广播变量的方式实现动态改变业务

三、代码

1、代码依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.3.2.3.1.0.0-78</spark.version>
    <hadoop.version>3.1.1.3.1.0.0-78</hadoop.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.45</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.38</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>

  </dependencies>

2、流处理代码

object TestStreamingBroadcast extends BaseHolder{

  private val map = mutable.LinkedHashMap[String, Long]()

    def main(args: Array[String]): Unit = {

      println("StreamingService Start : "+TimeUtil.getNowTime )

      // 初始化广播变量
      var instance: Broadcast[Map[String,Long]] = null
      if (instance != null){
        instance.unpersist(true)
      }
      instance = sc.broadcast(getMysql)
      println("广播变量初始化成功 : " + TimeUtil.getNowTime)

      // 判断控制流是否有数据生成
      fileControlStream.foreachRDD(rdd => {
        if(rdd.collect().length>0){
          if (instance != null){
            instance.unpersist(true)
          }
          instance = sc.broadcast(getMysql)
          println("广播变量更新成功 : " + TimeUtil.getNowTime)
        }
      })

      // 数据流进行业务处理
      kafkaDataStream.foreachRDD {
        rdd => {
          rdd.foreachPartition(datas => {
            val kafkaProducer = KafkaUtil.getProducer
            datas.foreach(data => {
              val jsonObj = JSON.parseObject(data.value())
              val mapKey = jsonObj.getInteger("p_typ_cd")+"&"+jsonObj.getInteger("p_sub_typ_cd")+"&"+jsonObj.getString("event_id")
              val map = instance.value
              if(map.get(mapKey).isDefined){
                val time = jsonObj.getLong("time")
                val endTime = map.get(mapKey)
                if(time < endTime.get){
                  kafkaProducer.send(new ProducerRecord[Int, String](activyTopic, Random.nextInt(9), data.value()))
                }
              }
            })
            kafkaProducer.close()
          })
        }
      }
      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
  }

  /**
    * 获取订阅数据
    * @return
    */
  def getMysql: Map[String, Long] = {
    map.clear()
    val conn = MysqlUtil.getJdbcConnection
    val sql = querySql
    val ps = conn.prepareStatement(sql)
    val rs = ps.executeQuery()
    while(rs.next()){
      val p_typ_cd = rs.getInt("p_typ_cd")
      val p_sub_typ_cd = rs.getInt("p_sub_typ_cd")
      val event_id = rs.getString("event_id")
      val end_time = rs.getLong("end_time")
      map.put(p_typ_cd+"&"+p_sub_typ_cd+"&"+event_id,end_time)
    }
    map.toMap
  }
}

3、Spark初始化代码(TestConfig 为参数配置Trait)


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 初始化SparkSteaming
  */
class BaseHolder extends TestConfig {

  val conf: SparkConf = new SparkConf()
    .setAppName("test")
    .set("spark.streaming.stopGracefullyOnShutdown", "true")               //优雅的关闭
    .set("spark.streaming.backpressure.enabled", "true")                   //控制流速
    .set("spark.rdd.compress", "true")                                     // RDD压缩
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Kryo序列化
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  val ssc = new StreamingContext(sc,Seconds(5))
  // 创建Kafka数据流
  val kafkaDataStream: InputDStream[ConsumerRecord[String, String]] = createDirectDataStream(ssc)
  // 创建File控制流
  val fileControlStream: DStream[String] = ssc.textFileStream(controlHdfsPath)

  /**
    * 创建Kafka数据流2.3.2)
    *
    * @param ssc StreamingContext
    * @return
    *
    * */
  def createDirectDataStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {

    val topics = Set[String](kafkaDataToic)
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootstrapServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> consumerGroupId,
      "auto.offset.reset" -> offset, // lastest
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    KafkaUtils.createDirectStream(
      ssc,
      // 位置策略:如果Spark任务和Kafka部署在一起,会优先将读取数据的Task启动到数据所在机器上
      LocationStrategies.PreferConsistent,
      // 消费策略:可以指定从哪些topic消费(并且可安装topic名字的正则进行匹配)
      ConsumerStrategies.Subscribe(topics, kafkaParams)
    )
  }
}

4、MysqlUtil代码(TestConfig 为参数配置Trait)


import java.sql
import java.sql.DriverManager

object MysqlUtil extends TestConfig{

  /**
    * 获取连接
    */
  def getJdbcConnection: sql.Connection = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    val jdbcUrl = mysqlJdbcUrl
    val conn = DriverManager.getConnection(jdbcUrl, mysqlUser, mysqlPwd)
    conn
  }

}

5、KafkaUtil代码

/**
  * kafka工具类
  * @author zhaoyh
  **/
object KafkaUtil extends TestConfig{

  /**
    * 获取Kafka生产者
    * @return kafkaProducer
    */
  def getProducer: KafkaProducer[Int, String] ={
    val props = new Properties()
    // 设置Kafka的服务
    props.put("bootstrap.servers", bootstrapServers)
    // 要求Kafka的leader在完成请求之前确认收到的数据条数,保证数据的持久性,只要有一个副本存活都不会丢数据
    props.put("acks", "all")
    props.put("retries", new Integer(0))
    props.put("batch.size", new Integer(16384))
    props.put("linger.ms", new Integer(1))
    props.put("buffer.memory", new Integer(33554432))
    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[Int, String](props)
    producer
  }

}

四、结束语

流处理实现动态改变配置现在还没有很多好的方法,如果有朋友有更好的解决方法可以私聊我。

相关文章

  • SparkStreaming通过双输入流动态配置业务

    SparkStreaming通过双输入流动态配置业务 一、背景 业务部门需要获得由数据部门实时采集的业务事件数据。...

  • SparkStreaming容错性

    SparkStreaming实时流处理系统需要长时间接受并处理数据,对于SparkStreaming的容错性主要通...

  • Spring Cloud Gateway 多种思路实现动态路由

    关于动态路由,是各类业务场景中的基础功能,通过动态化配置API网关的路由参数,可以实现在不重启服务的情况下,API...

  • Seata 动态配置订阅与降级实现原理

    Seata 的动态降级需要结合配置中心的动态配置订阅功能。动态配置订阅,即通过配置中心监听订阅,根据需要读取已更新...

  • struts2中的拦截器,你都用来干什么

    通过动态配置的方式,可以在执行Action的方法前后,加入相关的逻辑完成业务,struts2中的功能,(参数处理,...

  • 11.15商学院-波士顿矩阵

    将业务分解,清楚的知道现金流和业务之间的关系,通过波士顿矩阵,动态的寻找最佳组合方案,清晰后期的发力点与发展方向。

  • IO

    分类 输入流、输出流 数据 --> (单/双)管道 --> 输入数据 -->程序 字节流、字符流 每次读写 以字节...

  • 上海金蝶软件-销售管理产品方案

    销售管理通过全面的交易类型和灵活的业务流程配置,支持大型集团企业多组织、多元化经营下,各种销售业务处理规则和业务流...

  • 21、Skywalking的埋点-Agent动态采样控制

    通过Skywalking的动态配置机制,可以动态下发Agent端的采样配置,官方提供了如下几种: Config K...

  • SparkStreaming

    SparkStreaming是spark的一个子模块,用与快速构建可扩展,高吞吐量,高容错的流处理程序。通过高级A...

网友评论

      本文标题:SparkStreaming通过双输入流动态配置业务

      本文链接:https://www.haomeiwen.com/subject/hkacpctx.html