一、使用State
1.1 主入口
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// default Parallelism
val (defaultParallelism, invalidDate) = args.length match {
case 0 => (PropertiesUtil.getOrElse("flink.default.parallelism", "40").toInt, None)
case 1 => (args.head.toInt, None)
case _ => (args.head.toInt, Some(args(1)))
}
environment.setParallelism(defaultParallelism)
environment
.addSource(new SourceFromFile)
.map(message => {
var log: LogVO = null
parseLog(message) match {
case Success(Some(rawLog)) => log = rawLog
case Success(None) => {
logger.error("==========> parsedLog error, please check")
log = null
}
case Failure(e) => {
logger.error("==========> parsedLog error, error message: " + e.getMessage)
log = null
}
}
log
})
.keyBy(_.timestamp)
.process(new DemoProcessFunction())
.print()
environment.execute()
}
1.2 Process
class DemoProcessFunction extends KeyedProcessFunction[String, LogVO, LogVO] {
private val logger: Logger = LoggerFactory.getLogger(this.getClass);
private var connection: Connection = null
private var preparedStatement: PreparedStatement = null
private var valueState: ValueState[Map[String, String]] = _
private var timerState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
var valueStateDesc: ValueStateDescriptor[Map[String, String]] = new ValueStateDescriptor[Map[String, String]]("valueStateDesc",
TypeInformation.of(classOf[Map[String, String]]))
valueState = getRuntimeContext.getState(valueStateDesc)
var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc", TypeInformation.of(classOf[Long]))
timerState = getRuntimeContext.getState(timerStateDesc)
connection = MysqlUtil.getConnectionRead
preparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
}
override def processElement(value: LogVO, ctx: KeyedProcessFunction[String, LogVO, LogVO]#Context, out: Collector[LogVO]): Unit = {
var currentState: Map[String, String] = valueState.value()
if (null == currentState) {
logger.info("==========> query database for sync...")
currentState = Map()
MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
currentState += (rs.getString("uuid") -> rs.getString("id"))
})
valueState.update(currentState)
// 每15分钟更新一次
val syncInterval = PropertiesUtil.getOrElse("materials.data.sync.interval", "15").toInt * 60 * 1000
val ttlTime: Long = System.currentTimeMillis() + syncInterval
ctx.timerService().registerProcessingTimeTimer(ttlTime)
timerState.update(ttlTime)
}
logger.info("==========> currentState: " + currentState)
out.collect(value)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, LogVO, LogVO]#OnTimerContext, out: Collector[LogVO]): Unit = {
super.onTimer(timestamp, ctx, out)
logger.info("==========> sync demo data every 15min...")
valueState.clear()
// 清除Timer
val ttlTime = timerState.value()
ctx.timerService().deleteProcessingTimeTimer(ttlTime)
}
}
这样子可以实现:
每15分钟去查一次数据库,更新配置信息。
1.3 测试
==========> query database for sync...
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907111","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907222","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907333","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907444","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> query database for sync...
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983555","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983666","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983777","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> sync demo data every 15min...
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983888","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
从测试结果可以发现:
- 在同一个Slot中处理的数据,不用每次都查询数据库
- 多个Slot会查询多次数据库
出现问题:如果Slot数很多,那么查询数据库的次数会显著增加。
二、使用Broadcast
2.1 主入口
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// default Parallelism
val (defaultParallelism, invalidDate) = args.length match {
case 0 => (PropertiesUtil.getOrElse("flink.default.parallelism", "40").toInt, None)
case 1 => (args.head.toInt, None)
case _ => (args.head.toInt, Some(args(1)))
}
environment.setParallelism(defaultParallelism)
val demoDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor(
"demoDescriptor", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[String]))
val demoBroadcast = environment
.addSource(new SourceFromDemo)
.setParallelism(1)
.broadcast(demoDescriptor)
environment
.addSource(new SourceFromFile)
.map(message => {
var log: LogVO = null
parseLog(message) match {
case Success(Some(rawLog)) => log = rawLog
case Success(None) => {
logger.error("==========> parsedLog error, please check")
log = null
}
case Failure(e) => {
logger.error("==========> parsedLog error, error message: " + e.getMessage)
log = null
}
}
log
})
.keyBy(_.impressionId)
.connect(demoBroadcast)
.process(new DemoBroadcastFunction(demoDescriptor))
.print()
environment.execute()
}
}
- 自定义 Source 从数据库读取配置信息
- 利用 broadcast 方法创建 demoBroadcast,广播上面自定义的配置信息Source
- 利用 connect 方法将日志 Source 和 配置信息 Broadcast 连接
- 调用 process 方法处理日志数据和 Broadcast 数据
2.2 自定义配置信息Source
import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.slf4j.{Logger, LoggerFactory}
class SourceFromDemo extends RichParallelSourceFunction[Map[String, String]] {
private val logger: Logger = LoggerFactory.getLogger(this.getClass);
private var connection: Connection = null
private var preparedStatement: PreparedStatement = null
private var isRunning: Boolean = false
private var lastQueryTime: Date = null
override def open(parameters: Configuration): Unit = {
super.open(parameters)
logger.info("==========> open...")
connection = MysqlUtil.getConnectionRead
preparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
isRunning = true
}
override def run(ctx: SourceFunction.SourceContext[Map[String, String]]): Unit = {
while (isRunning) {
// 初始化更新物料数据
if (null == lastQueryTime) {
logger.info("==========> query db for init ...")
var map: Map[String, String] = Map()
MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
map += (rs.getString("uuid") -> rs.getString("id"))
})
ctx.collect(map)
lastQueryTime = new Date()
}
// 每15min更新一次物料数据
val dataSyncInterval: Int = PropertiesUtil.getOrElse("materials.data.sync.interval", "15").toInt
val nowTime: Date = new Date()
if (CommonUtil.dataBetweenMin(lastQueryTime, nowTime) >= dataSyncInterval) {
logger.info("==========> query db for update every 15min ...")
var map: Map[String, String] = Map()
MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
map += (rs.getString("uuid") -> rs.getString("id"))
})
ctx.collect(map)
lastQueryTime = nowTime
}
TimeUnit.MINUTES.sleep(dataSyncInterval)
}
}
override def close(): Unit = {
super.close()
logger.info("==========> close...")
MysqlUtil.releaseResource(preparedStatement)
MysqlUtil.releaseResource(connection)
isRunning = false
}
override def cancel(): Unit = {
logger.info("==========> cancel...")
MysqlUtil.releaseResource(preparedStatement)
MysqlUtil.releaseResource(connection)
isRunning = false
}
}
2.3 Process
import java.sql.{Connection, PreparedStatement, ResultSet}
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.util.Collector
import org.slf4j.{Logger, LoggerFactory}
class DemoBroadcastFunction(demoDescriptor: MapStateDescriptor[String, String]) extends KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO] {
private val logger: Logger = LoggerFactory.getLogger(this.getClass);
private var backupMap: Map[String, String] = null
override def processElement(value: LogVO, ctx: KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO]#ReadOnlyContext, out: Collector[LogVO]): Unit = {
var broadcastState: ReadOnlyBroadcastState[String, String] = ctx.getBroadcastState(demoDescriptor)
val broadcastFlag = broadcastState.get("broadcastFlag")
if (null == broadcastFlag || !"ready".equalsIgnoreCase(broadcastFlag)) {
logger.info("==========> broadcast not ready")
updateBackupMap()
logger.info("==========> use backupMap")
out.collect(value)
} else {
logger.info("==========> broadcast is ready")
logger.info("==========> use broadcast")
out.collect(value)
}
}
override def processBroadcastElement(value: Map[String, String], ctx: KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO]#Context, out: Collector[LogVO]): Unit = {
var broadcastState: BroadcastState[String, String] = ctx.getBroadcastState(demoDescriptor)
broadcastState.clear()
for (entry <- value) {
broadcastState.put(entry._1, entry._2)
}
// 加入校验位
broadcastState.put("broadcastFlag", "ready")
// 清除后备Map
backupMap = null
}
def updateBackupMap(): Unit = {
if (null == backupMap) {
val connection: Connection = MysqlUtil.getConnectionRead
val ps: PreparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
backupMap = Map[String, String]()
MysqlUtil.execQuery(connection, ps, (rs: ResultSet) => {
backupMap += (rs.getString("uuid") -> rs.getString("id"))
})
MysqlUtil.releaseResource(ps)
MysqlUtil.releaseResource(connection)
}
}
}
2.4 测试
==========> broadcast not ready
==========> query db for init ...
==========> use backupMap
2> {"timestamp":"1578907111","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast not ready
==========> use backupMap
2> {"timestamp":"1578907222","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
2> {"timestamp":"1578907333","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
2> {"timestamp":"1578907444","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
1> {"timestamp":"1578983555","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
1> {"timestamp":"1578983666","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
1> {"timestamp":"1578983777","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> query db for update every 15min ...
==========> broadcast: ready
==========> use broadcast
1> {"timestamp":"1578983888","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
从测试结果可以看出:
- 每隔一段时间查询一次数据作为配置信息
- 每次的查询结果都会广播到各个Slot上,而不是每个Slot上都查询一次数据库
注意:
在 DemoBroadcastFunction 中,加入了 backupMap,用于在未获取到广播变量中的配置信息时,自行查询数据库获取配置信息。
这样做的原因:
程序刚启动时,没有先调用 processBroadcastElement 方法加载广播变量,从而导致 processElement 方法中获取到的广播变量为空,这样会导致刚开始处理的数据获取不到广播变量,因此必须自行查询数据库获取配置信息。
网友评论