美文网首页
tranquility 真绕

tranquility 真绕

作者: 专职掏大粪 | 来源:发表于2020-04-22 17:02 被阅读0次
    Event 定义,指定 DataSource 的 schema
    BeamFactory 定义,这个地方主要为了定义一些需要的信息。比如 :
        zk 地址,用来做服务发现
        dimension 指定
        Rollup 的聚合算子指定:count, sum, max, min 等,注意没有 avg
        segment 的时间粒度指定
        窗口大小指定
    
    class SimpleEventBeamFactory extends BeamFactory[SimpleEvent]
    {
    
      lazy val makeBeam: Beam[SimpleEvent] = {
    
        // Tranquility uses ZooKeeper (through Curator framework) for coordination.
        val curator = CuratorFrameworkFactory.newClient(
          "localhost:2181",
          new BoundedExponentialBackoffRetry(100, 3000, 5)
        )
        curator.start()
    
        val indexService = "druid/overlord" // Your overlord's druid.service, with slashes replaced by colons.
        val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path
        val dataSource = "foo"
        val dimensions = IndexedSeq("bar")
        val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
        val isRollup = true
    
        // Expects simpleEvent.timestamp to return a Joda DateTime object.
        DruidBeams
          .builder((simpleEvent: SimpleEvent) => simpleEvent.timestamp)
          .curator(curator)
          .discoveryPath(discoveryPath)
          .location(DruidLocation.create(indexService, dataSource))
          .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE, isRollup))
          .tuning(
            ClusteredBeamTuning(
              segmentGranularity = Granularity.HOUR,
              windowPeriod = new Period("PT10M"),
              partitions = 1,
              replicants = 1
            )
          )
          .buildBeam()
      }
    }
    

    自定义BeamFactory继承 BeamFactory,重写 makeBeam

    DruidBeams.buildBeam 使用 ClusteredBeam

    ClusteredBeamTuning 定义了beam 的参数

    flink druid sink 算子初始化 Tranquilizer

    val tranquilizer = Tranquilizer.create(beamFactory.makeBeam)
    

    Tranquilizer.start()

    @LifecycleStart
    def start(): Unit = {
      started = true
      sendThread.start()
    }
    

    sendThread.start() 启动 Tranquilizer 线程

    循环将数据发送到buffer 达到buffer阈值 触发

    Tranquilizer.sendBuffer

    private def sendBuffer(myIndex: Long, myBuffer: Buffer[MessageAndPromise]): Unit = {
        if (log.isDebugEnabled) {
          log.debug(s"Sending buffer with ${myBuffer.size} messages.")
        }
    
        val futureResults: Seq[Future[SendResult]] = try {
            //  实际上执行ClusteredBeam 的sendAll
          beam.sendAll(myBuffer.map(_.message))
        }
        
    }
    

    看一下ClusteredBeam.sendAll()

    override def sendAll(events: Seq[EventType]): Seq[Future[SendResult]] = {
      val now = timekeeper.now.withZone(DateTimeZone.UTC)
      // Events, grouped and ordered by truncated timestamp, with their original indexes remembered
      //将数据按时间分组并排序
      val eventsWithPromises = Vector() ++ events.map(event => (event, Promise[SendResult]()))
      val grouped: Seq[(DateTime, IndexedSeq[(EventType, Promise[SendResult])])] = (eventsWithPromises groupBy {
        case (event, promise) =>
          tuning.segmentBucket(timestamper(event)).start
      }).toSeq.sortBy(_._1.getMillis)
      // Possibly warm up future beams
      def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = {
        if (dt <= end) {
          dt :: toBeWarmed(tuning.segmentBucket(dt).end, end)
        } else {
          Nil
        }
      }
      val latestEventTimestamp: Option[DateTime] = grouped.lastOption map { case (truncatedTimestamp, group) =>
        val event: EventType = group.maxBy(tuple => timestamper(tuple._1).getMillis)._1
        timestamper(event)
      }
      val warmingBeams: Future[Seq[Beam[EventType]]] = Future.collect(
        for (
          latest <- latestEventTimestamp.toList;
          tbwTimestamp <- toBeWarmed(latest, latest + tuning.warmingPeriod) if tbwTimestamp > latest
        ) yield {
          // Create beam asynchronously
          beam(tbwTimestamp, now)
        }
      )
      // Send data
      省略发送数据的代码
        }
    

    将数据按时间分组并排序,取最大的时间 调用beam(tbwTimestamp, now)

    用于在发送数据之前创建beam

    private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = {
     
      val futureBeamOption = beams.get(timestamp.getMillis) match {
        case _ if !open => Future.value(None)
        case Some(x) if windowInterval.overlaps(bucket) => Future.value(Some(x))
        case Some(x) => Future.value(None)
        case None if timestamp <= localLatestCloseTime => Future.value(None)
        case None if !creationInterval.overlaps(bucket) => Future.value(None)
        case None =>
          // We may want to create new merged beam(s). Acquire the zk mutex and examine the situation.
          // This could be more efficient, but it's happening infrequently so it's probably not a big deal.
          data.modify {
            prev =>
              val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.getMillis, Nil)
              //判断现有的beam数据是否覆盖大于 定义的partitions数据,一个partition 对应一个beam
              if (prevBeamDicts.size >= tuning.partitions) {
                log.info(
                  "Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)",
                  identifier,
                  timestamp,
                  tuning.partitions,
                  prevBeamDicts.size
                )
                prev
    // 判断数据时间触发的新的task 时间是否大于上次任务结束的时间
              } else if (timestamp <= prev.latestCloseTime) {
                log.info(
                  "Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam",
                  prev.latestCloseTime,
                  identifier,
                  timestamp
                )
                prev
              } else {
    // 时间大于上次任务结束的时间并且现有beam数没有覆盖到partion 创建beam
                assert(prevBeamDicts.size < tuning.partitions)
                assert(timestamp > prev.latestCloseTime)
    
                // We might want to cover multiple time segments in advance.
                val numSegmentsToCover = tuning.minSegmentsPerBeam +
                  rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1)
                val intervalToCover = new Interval(
                  timestamp.getMillis,
                  tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).getMillis,
                  ISOChronology.getInstanceUTC
                )
                val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start)
    
                // OK, create them where needed.
                val newInnerBeamDictsByPartition = new mutable.HashMap[Int, Dict]
                val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot {
                  case (millis, beam) =>
                    // Expire old beamDicts
                    tuning.segmentGranularity.increment(new DateTime(millis)) + tuning.windowPeriod < now
                }) ++ (for (ts <- timestampsToCover) yield {
                  val tsPrevDicts = prev.beamDictss.getOrElse(ts.getMillis, Nil)
                  log.info(
                    "Creating new merged beam for identifier[%s] timestamp[%s] (target = %d, actual = %d)",
                    identifier,
                    ts,
                    tuning.partitions,
                    tsPrevDicts.size
                  )
                  val tsNewDicts = tsPrevDicts ++ ((tsPrevDicts.size until tuning.partitions) map {
                    partition =>
                      newInnerBeamDictsByPartition.getOrElseUpdate(
                        partition, {
                          // Create sub-beams and then immediately close them, just so we can get the dict representations.
                          // Close asynchronously, ignore return value.
                          beamMaker.newBeam(intervalToCover, partition).withFinally(_.close()) {
                            beam =>
                              val beamDict = beamMaker.toDict(beam)
                              log.info("Created beam: %s", objectMapper.writeValueAsString(beamDict))
                              beamDict
                          }
                        }
                      )
                  })
                  (ts.getMillis, tsNewDicts)
                })
    

    beamMaker.newBeam 创建新的beam

    override def newBeam(interval: Interval, partition: Int) = {
      require(
        beamTuning.segmentGranularity.widen(interval) == interval,
        "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
      )
      val baseFirehoseId = DruidBeamMaker.generateBaseFirehoseId(
        location.dataSource,
        beamTuning.segmentGranularity,
        interval.start,
        partition
      )
      val availabilityGroup = DruidBeamMaker.generateAvailabilityGroup(location.dataSource, interval.start, partition)
      val futureTasks = for (replicant <- 0 until beamTuning.replicants) yield {
        val firehoseId = "%s-%04d" format(baseFirehoseId, replicant)
    
        indexService.submit(taskBytes(interval, availabilityGroup, firehoseId, partition, replicant)) map {
          taskId =>
            TaskPointer(taskId, firehoseId)
        }
      }
      val tasks = Await.result(Future.collect(futureTasks))
      //DruidBeam 用于想mid的worker发送数据
      new DruidBeam(
        interval,
        partition,
        tasks,
        location,
        config,
        taskLocator,
        indexService,
        emitter,
        objectWriter
      )
    }
    
    indexService.submit(taskBytes(interval, availabilityGroup, firehoseId, partition, replicant)) map {
      taskId =>
        TaskPointer(taskId, firehoseId)
    }
    

    taskBytes 是任务的信息描述

    indexService.submit 与overload通信并携带任务的信息描述到 druid/indexer/v1/task接口 创建task

    DruidBeam.sendAll

    override def sendAll(messages: Seq[A]): Seq[Future[SendResult]] = {
      val messagesWithPromises = Vector() ++ messages.map(message => (message, Promise[SendResult]()))
    
      // Messages grouped into chunks
      val messagesChunks: List[(Array[Byte], IndexedSeq[(A, Promise[SendResult])])] = messagesWithPromises
        .grouped(config.firehoseChunkSize)
        .map(xs => (objectWriter.batchAsBytes(xs.map(_._1)), xs))
        .toList
    
      for ((messagesChunkBytes, messagesChunk) <- messagesChunks) {
        // Try to send to all tasks, return "sent" if any of them accepted it.
        val taskResponses: Seq[Future[(TaskPointer, SendResult)]] = for {
          task <- tasks
          client <- clients.get(task) if client.active
        } yield {
          val messagePost = HttpPost(
            "/druid/worker/v1/chat/%s/push-events" format
              (location.environment.firehoseServicePattern format task.serviceKey)
          ) 
        }
    
        // Avoid become(chunkResult), for some reason it creates massive promise chains.
        chunkResult respond {
          case Return(result) => messagesChunk.foreach(_._2.setValue(result))
          case Throw(e) => messagesChunk.foreach(_._2.setException(e))
        }
      }
    
      messagesWithPromises.map(_._2)
    }
    

    /druid/worker/v1/chat/%s/push-events worker的数据接受服务

    ClusteredBeam 的作用

    1.用于调用beamMaker.newBeam 创建DruidBeam 发送数据

    2.用于检测根据时间和分区及当前存在的beams 判断是否需要创建新的beam

    3.DruidBeamMaker.newBeam 里面可以实现在每次创建新的beam 获取最新的schema的逻辑,用于动态schama

    tranquility 维护每一个beam的ClusteredBeamMeta
    默认会写到data目录

    [zk: localhost:2181(CONNECTED) 2] get /druid/tranquility/beams/druid:tranquility:indexer/server_brand/
    
    mutex   data
    
    
    {
        "latestTime":"2020-04-23T03:00:00.000Z",
        "latestCloseTime":"2020-04-23T01:00:00.000Z",
        "beams":{
            "2020-04-23T02:00:00.000Z":[
                {
                    "interval":"2020-04-23T02:00:00.000Z/2020-04-23T03:00:00.000Z",
                    "partition":0,
                    "tasks":Array[1],
                    "timestamp":"2020-04-23T02:00:00.000Z"
                },
                {
                    "interval":"2020-04-23T02:00:00.000Z/2020-04-23T03:00:00.000Z",
                    "partition":1,
                    "tasks":Array[1],
                    "timestamp":"2020-04-23T02:00:00.000Z"
                }
            ],
            "2020-04-23T03:00:00.000Z":[
                {
                    "interval":"2020-04-23T03:00:00.000Z/2020-04-23T04:00:00.000Z",
                    "partition":0,
                    "tasks":[
                        {
                            "id":"index_realtime_logdata_2020-04-23T03:00:00.000Z_0_0",
                            "firehoseId":"logdata-003-0000-0000"
                        }
                    ],
                    "timestamp":"2020-04-23T03:00:00.000Z"
                },
                {
                    "interval":"2020-04-23T03:00:00.000Z/2020-04-23T04:00:00.000Z",
                    "partition":1,
                    "tasks":[
                        {
                            "id":"index_realtime_logdata_2020-04-23T03:00:00.000Z_1_0",
                            "firehoseId":"logdata-003-0001-0000"
                        }
                    ],
                    "timestamp":"2020-04-23T03:00:00.000Z"
                }
            ]
        }
    }
    

    相关文章

      网友评论

          本文标题:tranquility 真绕

          本文链接:https://www.haomeiwen.com/subject/ygkwihtx.html