美文网首页
Flink从数据库读取配置信息

Flink从数据库读取配置信息

作者: Jorvi | 来源:发表于2020-01-15 20:56 被阅读0次

    一、使用State

    1.1 主入口

      def main(args: Array[String]): Unit = {
    
        val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        // default Parallelism
        val (defaultParallelism, invalidDate) = args.length match {
          case 0 => (PropertiesUtil.getOrElse("flink.default.parallelism", "40").toInt, None)
          case 1 => (args.head.toInt, None)
          case _ => (args.head.toInt, Some(args(1)))
        }
        environment.setParallelism(defaultParallelism)
    
        environment
          .addSource(new SourceFromFile)
          .map(message => {
            var log: LogVO = null
            parseLog(message) match {
              case Success(Some(rawLog)) => log = rawLog
              case Success(None) => {
                logger.error("==========> parsedLog error, please check")
                log = null
              }
              case Failure(e) => {
                logger.error("==========> parsedLog error, error message: " + e.getMessage)
                log = null
              }
            }
            log
          })
          .keyBy(_.timestamp)
          .process(new DemoProcessFunction())
          .print()
    
        environment.execute()
      }
    
    

    1.2 Process

    class DemoProcessFunction extends KeyedProcessFunction[String, LogVO, LogVO] {
    
      private val logger: Logger = LoggerFactory.getLogger(this.getClass);
      private var connection: Connection = null
      private var preparedStatement: PreparedStatement = null
      private var valueState: ValueState[Map[String, String]] = _
      private var timerState: ValueState[Long] = _
    
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
    
        var valueStateDesc: ValueStateDescriptor[Map[String, String]] = new ValueStateDescriptor[Map[String, String]]("valueStateDesc",
          TypeInformation.of(classOf[Map[String, String]]))
        valueState = getRuntimeContext.getState(valueStateDesc)
    
        var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc", TypeInformation.of(classOf[Long]))
        timerState = getRuntimeContext.getState(timerStateDesc)
    
        connection = MysqlUtil.getConnectionRead
        preparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
      }
    
      override def processElement(value: LogVO, ctx: KeyedProcessFunction[String, LogVO, LogVO]#Context, out: Collector[LogVO]): Unit = {
        var currentState: Map[String, String] = valueState.value()
        if (null == currentState) {
          logger.info("==========> query database for sync...")
          currentState = Map()
          MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
            currentState += (rs.getString("uuid") -> rs.getString("id"))
          })
    
          valueState.update(currentState)
    
          // 每15分钟更新一次
          val syncInterval = PropertiesUtil.getOrElse("materials.data.sync.interval", "15").toInt * 60 * 1000
          val ttlTime: Long = System.currentTimeMillis() + syncInterval
          ctx.timerService().registerProcessingTimeTimer(ttlTime)
    
          timerState.update(ttlTime)
        }
    
        logger.info("==========> currentState: " + currentState)
    
        out.collect(value)
      }
    
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, LogVO, LogVO]#OnTimerContext, out: Collector[LogVO]): Unit = {
        super.onTimer(timestamp, ctx, out)
        logger.info("==========> sync demo data every 15min...")
        valueState.clear()
    
        // 清除Timer
        val ttlTime = timerState.value()
        ctx.timerService().deleteProcessingTimeTimer(ttlTime)
      }
    
    }
    

    这样子可以实现:
    每15分钟去查一次数据库,更新配置信息。

    1.3 测试

    ==========> query database for sync...
    ==========> currentState: Map(N-3998 -> 10929133)
    1> {"timestamp":"1578907111","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> currentState: Map(N-3998 -> 10929133)
    1> {"timestamp":"1578907222","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> currentState: Map(N-3998 -> 10929133)
    1> {"timestamp":"1578907333","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> currentState: Map(N-3998 -> 10929133)
    1> {"timestamp":"1578907444","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> query database for sync...
    ==========> currentState: Map(N-3998 -> 10929133)
    3> {"timestamp":"1578983555","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> currentState: Map(N-3998 -> 10929133)
    3> {"timestamp":"1578983666","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> currentState: Map(N-3998 -> 10929133)
    3> {"timestamp":"1578983777","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    ==========> sync demo data every 15min...
    ==========> currentState: Map(N-3998 -> 10929133)
    3> {"timestamp":"1578983888","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
    

    从测试结果可以发现:

    1. 在同一个Slot中处理的数据,不用每次都查询数据库
    2. 多个Slot会查询多次数据库

    出现问题:如果Slot数很多,那么查询数据库的次数会显著增加。

    二、使用Broadcast

    2.1 主入口

      def main(args: Array[String]): Unit = {
    
        val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        // default Parallelism
        val (defaultParallelism, invalidDate) = args.length match {
          case 0 => (PropertiesUtil.getOrElse("flink.default.parallelism", "40").toInt, None)
          case 1 => (args.head.toInt, None)
          case _ => (args.head.toInt, Some(args(1)))
        }
        environment.setParallelism(defaultParallelism)
    
        val demoDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor(
          "demoDescriptor", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[String]))
        val demoBroadcast = environment
          .addSource(new SourceFromDemo)
          .setParallelism(1)
          .broadcast(demoDescriptor)
    
        environment
          .addSource(new SourceFromFile)
          .map(message => {
            var log: LogVO = null
            parseLog(message) match {
              case Success(Some(rawLog)) => log = rawLog
              case Success(None) => {
                logger.error("==========> parsedLog error, please check")
                log = null
              }
              case Failure(e) => {
                logger.error("==========> parsedLog error, error message: " + e.getMessage)
                log = null
              }
            }
            log
          })
          .keyBy(_.impressionId)
          .connect(demoBroadcast)
          .process(new DemoBroadcastFunction(demoDescriptor))
          .print()
    
        environment.execute()
    
      }
    }
    
    1. 自定义 Source 从数据库读取配置信息
    2. 利用 broadcast 方法创建 demoBroadcast,广播上面自定义的配置信息Source
    3. 利用 connect 方法将日志 Source 和 配置信息 Broadcast 连接
    4. 调用 process 方法处理日志数据和 Broadcast 数据

    2.2 自定义配置信息Source

    import java.sql.{Connection, PreparedStatement, ResultSet}
    import java.util.Date
    import java.util.concurrent.TimeUnit
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
    import org.slf4j.{Logger, LoggerFactory}
    
    class SourceFromDemo extends RichParallelSourceFunction[Map[String, String]] {
    
      private val logger: Logger = LoggerFactory.getLogger(this.getClass);
      private var connection: Connection = null
      private var preparedStatement: PreparedStatement = null
      private var isRunning: Boolean = false
      private var lastQueryTime: Date = null
    
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        logger.info("==========> open...")
        connection = MysqlUtil.getConnectionRead
        preparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
        isRunning = true
      }
    
    
      override def run(ctx: SourceFunction.SourceContext[Map[String, String]]): Unit = {
        while (isRunning) {
          // 初始化更新物料数据
          if (null == lastQueryTime) {
            logger.info("==========> query db for init ...")
    
            var map: Map[String, String] = Map()
            MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
              map += (rs.getString("uuid") -> rs.getString("id"))
            })
            ctx.collect(map)
    
            lastQueryTime = new Date()
          }
    
          // 每15min更新一次物料数据
          val dataSyncInterval: Int = PropertiesUtil.getOrElse("materials.data.sync.interval", "15").toInt
          val nowTime: Date = new Date()
          if (CommonUtil.dataBetweenMin(lastQueryTime, nowTime) >= dataSyncInterval) {
            logger.info("==========> query db for update every 15min ...")
    
            var map: Map[String, String] = Map()
            MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
              map += (rs.getString("uuid") -> rs.getString("id"))
            })
            ctx.collect(map)
            
            lastQueryTime = nowTime
          }
    
          TimeUnit.MINUTES.sleep(dataSyncInterval)
        }
      }
    
      override def close(): Unit = {
        super.close()
        logger.info("==========> close...")
        MysqlUtil.releaseResource(preparedStatement)
        MysqlUtil.releaseResource(connection)
        isRunning = false
      }
    
      override def cancel(): Unit = {
        logger.info("==========> cancel...")
        MysqlUtil.releaseResource(preparedStatement)
        MysqlUtil.releaseResource(connection)
        isRunning = false
      }
    
    }
    

    2.3 Process

    import java.sql.{Connection, PreparedStatement, ResultSet}
    
    import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState}
    import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
    import org.apache.flink.util.Collector
    import org.slf4j.{Logger, LoggerFactory}
    
    class DemoBroadcastFunction(demoDescriptor: MapStateDescriptor[String, String]) extends KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO] {
    
      private val logger: Logger = LoggerFactory.getLogger(this.getClass);
    
      private var backupMap: Map[String, String] = null
    
      override def processElement(value: LogVO, ctx: KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO]#ReadOnlyContext, out: Collector[LogVO]): Unit = {
        var broadcastState: ReadOnlyBroadcastState[String, String] = ctx.getBroadcastState(demoDescriptor)
        val broadcastFlag = broadcastState.get("broadcastFlag")
        if (null == broadcastFlag || !"ready".equalsIgnoreCase(broadcastFlag)) {
          logger.info("==========> broadcast not ready")
          updateBackupMap()
          logger.info("==========> use backupMap")
          out.collect(value)
        } else {
          logger.info("==========> broadcast is ready")
          logger.info("==========> use broadcast")
          out.collect(value)
        }
      }
    
      override def processBroadcastElement(value: Map[String, String], ctx: KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO]#Context, out: Collector[LogVO]): Unit = {
        var broadcastState: BroadcastState[String, String] = ctx.getBroadcastState(demoDescriptor)
        broadcastState.clear()
    
        for (entry <- value) {
          broadcastState.put(entry._1, entry._2)
        }
    
        // 加入校验位
        broadcastState.put("broadcastFlag", "ready")
    
        // 清除后备Map
        backupMap = null
      }
    
      def updateBackupMap(): Unit = {
        if (null == backupMap) {
          val connection: Connection = MysqlUtil.getConnectionRead
          val ps: PreparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
    
          backupMap = Map[String, String]()
          MysqlUtil.execQuery(connection, ps, (rs: ResultSet) => {
            backupMap += (rs.getString("uuid") -> rs.getString("id"))
          })
    
          MysqlUtil.releaseResource(ps)
          MysqlUtil.releaseResource(connection)
        }
      }
    }
    
    

    2.4 测试

    ==========> broadcast not ready
    ==========> query db for init ...
    ==========> use backupMap
    2> {"timestamp":"1578907111","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> broadcast not ready
    ==========> use backupMap
    2> {"timestamp":"1578907222","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> broadcast is ready
    ==========> use broadcast
    2> {"timestamp":"1578907333","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> broadcast is ready
    ==========> use broadcast
    2> {"timestamp":"1578907444","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> broadcast is ready
    ==========> use broadcast
    1> {"timestamp":"1578983555","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> broadcast is ready
    ==========> use broadcast
    1> {"timestamp":"1578983666","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> broadcast is ready
    ==========> use broadcast
    1> {"timestamp":"1578983777","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    ==========> query db for update every 15min ...
    ==========> broadcast: ready
    ==========> use broadcast
    1> {"timestamp":"1578983888","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
    

    从测试结果可以看出:

    1. 每隔一段时间查询一次数据作为配置信息
    2. 每次的查询结果都会广播到各个Slot上,而不是每个Slot上都查询一次数据库

    注意:
    在 DemoBroadcastFunction 中,加入了 backupMap,用于在未获取到广播变量中的配置信息时,自行查询数据库获取配置信息。

    这样做的原因:
    程序刚启动时,没有先调用 processBroadcastElement 方法加载广播变量,从而导致 processElement 方法中获取到的广播变量为空,这样会导致刚开始处理的数据获取不到广播变量,因此必须自行查询数据库获取配置信息。

    相关文章

      网友评论

          本文标题:Flink从数据库读取配置信息

          本文链接:https://www.haomeiwen.com/subject/nbcaactx.html