Event 定义,指定 DataSource 的 schema
BeamFactory 定义,这个地方主要为了定义一些需要的信息。比如 :
zk 地址,用来做服务发现
dimension 指定
Rollup 的聚合算子指定:count, sum, max, min 等,注意没有 avg
segment 的时间粒度指定
窗口大小指定
class SimpleEventBeamFactory extends BeamFactory[SimpleEvent]
{
lazy val makeBeam: Beam[SimpleEvent] = {
// Tranquility uses ZooKeeper (through Curator framework) for coordination.
val curator = CuratorFrameworkFactory.newClient(
"localhost:2181",
new BoundedExponentialBackoffRetry(100, 3000, 5)
)
curator.start()
val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons.
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path
val dataSource = "foo"
val dimensions = IndexedSeq("bar")
val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
val isRollup = true
// Expects simpleEvent.timestamp to return a Joda DateTime object.
DruidBeams
.builder((simpleEvent: SimpleEvent) => simpleEvent.timestamp)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(indexService, dataSource))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE, isRollup))
.tuning(
ClusteredBeamTuning(
segmentGranularity = Granularity.HOUR,
windowPeriod = new Period("PT10M"),
partitions = 1,
replicants = 1
)
)
.buildBeam()
}
}
自定义BeamFactory继承 BeamFactory,重写 makeBeam
DruidBeams.buildBeam 使用 ClusteredBeam
ClusteredBeamTuning 定义了beam 的参数
flink druid sink 算子初始化 Tranquilizer
val tranquilizer = Tranquilizer.create(beamFactory.makeBeam)
Tranquilizer.start()
@LifecycleStart
def start(): Unit = {
started = true
sendThread.start()
}
sendThread.start() 启动 Tranquilizer 线程
循环将数据发送到buffer 达到buffer阈值 触发
Tranquilizer.sendBuffer
private def sendBuffer(myIndex: Long, myBuffer: Buffer[MessageAndPromise]): Unit = {
if (log.isDebugEnabled) {
log.debug(s"Sending buffer with ${myBuffer.size} messages.")
}
val futureResults: Seq[Future[SendResult]] = try {
// 实际上执行ClusteredBeam 的sendAll
beam.sendAll(myBuffer.map(_.message))
}
}
看一下ClusteredBeam.sendAll()
override def sendAll(events: Seq[EventType]): Seq[Future[SendResult]] = {
val now = timekeeper.now.withZone(DateTimeZone.UTC)
// Events, grouped and ordered by truncated timestamp, with their original indexes remembered
//将数据按时间分组并排序
val eventsWithPromises = Vector() ++ events.map(event => (event, Promise[SendResult]()))
val grouped: Seq[(DateTime, IndexedSeq[(EventType, Promise[SendResult])])] = (eventsWithPromises groupBy {
case (event, promise) =>
tuning.segmentBucket(timestamper(event)).start
}).toSeq.sortBy(_._1.getMillis)
// Possibly warm up future beams
def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = {
if (dt <= end) {
dt :: toBeWarmed(tuning.segmentBucket(dt).end, end)
} else {
Nil
}
}
val latestEventTimestamp: Option[DateTime] = grouped.lastOption map { case (truncatedTimestamp, group) =>
val event: EventType = group.maxBy(tuple => timestamper(tuple._1).getMillis)._1
timestamper(event)
}
val warmingBeams: Future[Seq[Beam[EventType]]] = Future.collect(
for (
latest <- latestEventTimestamp.toList;
tbwTimestamp <- toBeWarmed(latest, latest + tuning.warmingPeriod) if tbwTimestamp > latest
) yield {
// Create beam asynchronously
beam(tbwTimestamp, now)
}
)
// Send data
省略发送数据的代码
}
将数据按时间分组并排序,取最大的时间 调用beam(tbwTimestamp, now)
用于在发送数据之前创建beam
private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = {
val futureBeamOption = beams.get(timestamp.getMillis) match {
case _ if !open => Future.value(None)
case Some(x) if windowInterval.overlaps(bucket) => Future.value(Some(x))
case Some(x) => Future.value(None)
case None if timestamp <= localLatestCloseTime => Future.value(None)
case None if !creationInterval.overlaps(bucket) => Future.value(None)
case None =>
// We may want to create new merged beam(s). Acquire the zk mutex and examine the situation.
// This could be more efficient, but it's happening infrequently so it's probably not a big deal.
data.modify {
prev =>
val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.getMillis, Nil)
//判断现有的beam数据是否覆盖大于 定义的partitions数据,一个partition 对应一个beam
if (prevBeamDicts.size >= tuning.partitions) {
log.info(
"Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)",
identifier,
timestamp,
tuning.partitions,
prevBeamDicts.size
)
prev
// 判断数据时间触发的新的task 时间是否大于上次任务结束的时间
} else if (timestamp <= prev.latestCloseTime) {
log.info(
"Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam",
prev.latestCloseTime,
identifier,
timestamp
)
prev
} else {
// 时间大于上次任务结束的时间并且现有beam数没有覆盖到partion 创建beam
assert(prevBeamDicts.size < tuning.partitions)
assert(timestamp > prev.latestCloseTime)
// We might want to cover multiple time segments in advance.
val numSegmentsToCover = tuning.minSegmentsPerBeam +
rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1)
val intervalToCover = new Interval(
timestamp.getMillis,
tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).getMillis,
ISOChronology.getInstanceUTC
)
val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start)
// OK, create them where needed.
val newInnerBeamDictsByPartition = new mutable.HashMap[Int, Dict]
val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot {
case (millis, beam) =>
// Expire old beamDicts
tuning.segmentGranularity.increment(new DateTime(millis)) + tuning.windowPeriod < now
}) ++ (for (ts <- timestampsToCover) yield {
val tsPrevDicts = prev.beamDictss.getOrElse(ts.getMillis, Nil)
log.info(
"Creating new merged beam for identifier[%s] timestamp[%s] (target = %d, actual = %d)",
identifier,
ts,
tuning.partitions,
tsPrevDicts.size
)
val tsNewDicts = tsPrevDicts ++ ((tsPrevDicts.size until tuning.partitions) map {
partition =>
newInnerBeamDictsByPartition.getOrElseUpdate(
partition, {
// Create sub-beams and then immediately close them, just so we can get the dict representations.
// Close asynchronously, ignore return value.
beamMaker.newBeam(intervalToCover, partition).withFinally(_.close()) {
beam =>
val beamDict = beamMaker.toDict(beam)
log.info("Created beam: %s", objectMapper.writeValueAsString(beamDict))
beamDict
}
}
)
})
(ts.getMillis, tsNewDicts)
})
beamMaker.newBeam 创建新的beam
override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)
val baseFirehoseId = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
beamTuning.segmentGranularity,
interval.start,
partition
)
val availabilityGroup = DruidBeamMaker.generateAvailabilityGroup(location.dataSource, interval.start, partition)
val futureTasks = for (replicant <- 0 until beamTuning.replicants) yield {
val firehoseId = "%s-%04d" format(baseFirehoseId, replicant)
indexService.submit(taskBytes(interval, availabilityGroup, firehoseId, partition, replicant)) map {
taskId =>
TaskPointer(taskId, firehoseId)
}
}
val tasks = Await.result(Future.collect(futureTasks))
//DruidBeam 用于想mid的worker发送数据
new DruidBeam(
interval,
partition,
tasks,
location,
config,
taskLocator,
indexService,
emitter,
objectWriter
)
}
indexService.submit(taskBytes(interval, availabilityGroup, firehoseId, partition, replicant)) map {
taskId =>
TaskPointer(taskId, firehoseId)
}
taskBytes 是任务的信息描述
indexService.submit 与overload通信并携带任务的信息描述到 druid/indexer/v1/task接口 创建task
DruidBeam.sendAll
override def sendAll(messages: Seq[A]): Seq[Future[SendResult]] = {
val messagesWithPromises = Vector() ++ messages.map(message => (message, Promise[SendResult]()))
// Messages grouped into chunks
val messagesChunks: List[(Array[Byte], IndexedSeq[(A, Promise[SendResult])])] = messagesWithPromises
.grouped(config.firehoseChunkSize)
.map(xs => (objectWriter.batchAsBytes(xs.map(_._1)), xs))
.toList
for ((messagesChunkBytes, messagesChunk) <- messagesChunks) {
// Try to send to all tasks, return "sent" if any of them accepted it.
val taskResponses: Seq[Future[(TaskPointer, SendResult)]] = for {
task <- tasks
client <- clients.get(task) if client.active
} yield {
val messagePost = HttpPost(
"/druid/worker/v1/chat/%s/push-events" format
(location.environment.firehoseServicePattern format task.serviceKey)
)
}
// Avoid become(chunkResult), for some reason it creates massive promise chains.
chunkResult respond {
case Return(result) => messagesChunk.foreach(_._2.setValue(result))
case Throw(e) => messagesChunk.foreach(_._2.setException(e))
}
}
messagesWithPromises.map(_._2)
}
/druid/worker/v1/chat/%s/push-events worker的数据接受服务
ClusteredBeam 的作用
1.用于调用beamMaker.newBeam 创建DruidBeam 发送数据
2.用于检测根据时间和分区及当前存在的beams 判断是否需要创建新的beam
3.DruidBeamMaker.newBeam 里面可以实现在每次创建新的beam 获取最新的schema的逻辑,用于动态schama
tranquility 维护每一个beam的ClusteredBeamMeta
默认会写到data目录
[zk: localhost:2181(CONNECTED) 2] get /druid/tranquility/beams/druid:tranquility:indexer/server_brand/
mutex data
{
"latestTime":"2020-04-23T03:00:00.000Z",
"latestCloseTime":"2020-04-23T01:00:00.000Z",
"beams":{
"2020-04-23T02:00:00.000Z":[
{
"interval":"2020-04-23T02:00:00.000Z/2020-04-23T03:00:00.000Z",
"partition":0,
"tasks":Array[1],
"timestamp":"2020-04-23T02:00:00.000Z"
},
{
"interval":"2020-04-23T02:00:00.000Z/2020-04-23T03:00:00.000Z",
"partition":1,
"tasks":Array[1],
"timestamp":"2020-04-23T02:00:00.000Z"
}
],
"2020-04-23T03:00:00.000Z":[
{
"interval":"2020-04-23T03:00:00.000Z/2020-04-23T04:00:00.000Z",
"partition":0,
"tasks":[
{
"id":"index_realtime_logdata_2020-04-23T03:00:00.000Z_0_0",
"firehoseId":"logdata-003-0000-0000"
}
],
"timestamp":"2020-04-23T03:00:00.000Z"
},
{
"interval":"2020-04-23T03:00:00.000Z/2020-04-23T04:00:00.000Z",
"partition":1,
"tasks":[
{
"id":"index_realtime_logdata_2020-04-23T03:00:00.000Z_1_0",
"firehoseId":"logdata-003-0001-0000"
}
],
"timestamp":"2020-04-23T03:00:00.000Z"
}
]
}
}
网友评论