美文网首页
使用flink的watermark解决数据乱序和延迟

使用flink的watermark解决数据乱序和延迟

作者: 万州客 | 来源:发表于2022-05-08 00:12 被阅读0次

有些高级了,我也就能把代码弄起来运行,具体细节,还有猜的成本~

一,代码

demo.scala

package org.bbk.flink

import java.text.SimpleDateFormat
import java.{lang, util}

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction}
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, OutputTag, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector

case class CountWord(word: String, count: Long)

object Demo {
  def main(args:Array[String]):Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //environment.enableCheckpointing(1000)
    //environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    //environment.getCheckpointConfig.setCheckpointTimeout(6000)
    //environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //environment.setStateBackend(new MemoryStateBackend())
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    environment.setParallelism(1)
    val sourceStream = environment.socketTextStream("192.168.1.111", 9000)
    val inputMap: DataStream[(String, Long)] = sourceStream.map(line => {
      val arr = line.split(" ")
      (arr(0), arr(1).toLong)
    })

    val waterMarkStream: DataStream[(String, Long)] = inputMap
      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, Long)] {
        var currentMaxTimestamp = 0L
        var watermarkDiff: Long = 10000L
        val sdf = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss.SSS")

        override def checkAndGetNextWatermark(t:(String, Long), l:Long): Watermark = {
          val watermark = new Watermark(currentMaxTimestamp - watermarkDiff)
          watermark
        }

        override def extractTimestamp(element: (String, Long), l: Long): Long = {
          val eventTime = element._2
          currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)
          val id = Thread.currentThread().getId
          println("currentThreadId: " + id +
            ",key: " + element._1 +
            ", eventTime: [" + element._2 + "|" + sdf.format(element._2) +
            "],currentMaxTimestamp: [" + currentMaxTimestamp + "|" +sdf.format(currentMaxTimestamp) +
            "], watermark: [" + this.checkAndGetNextWatermark(element, l).getTimestamp + "|" + sdf.format(this.checkAndGetNextWatermark(element, l).getTimestamp) + "]")
          eventTime
        }
      })

    val outputTag: OutputTag[(String, Long)] = new OutputTag[(String, Long)]("late_data")
    val outputWindow: DataStream[String] = waterMarkStream
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .sideOutputLateData(outputTag)
      .apply(new MyWindowFunction)

    val sideOutput: DataStream[(String, Long)] = outputWindow.getSideOutput((outputTag))
    sideOutput.print()
    outputWindow.print()
    environment.execute()
  }
}

MyWindowFunction.scala

package org.bbk.flink

import scala.util.Sorting
import java.text.SimpleDateFormat

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

class MyWindowFunction extends WindowFunction[(String, Long), String, Tuple, TimeWindow]{
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
    val keyStr = key.toString
    val arrBuf = ArrayBuffer[Long]()
    val ite = input.iterator
    while (ite.hasNext) {
      val tup2 = ite.next()
      arrBuf.append(tup2._2)
    }
    val arr = arrBuf.toArray
    Sorting.quickSort(arr)
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    val result = "聚合数据的key为:" + keyStr +
      ",窗口中数据的条数为:" + arr.length +
      ",窗口中第一条数据为:" + sdf.format(arr.head) +
      ", 窗口中最后一条数据为:" + sdf.format(arr.last) +
      ",窗口起始时间为:" + sdf.format(window.getStart) +
      ",窗口结束时间为:" + sdf.format(window.getEnd) +
      "!!!看到这个结果,就证明窗口已经运行了"
    out.collect((result))
  }
}

二,测试数据

001 1569895882000
001 1569895885000
001 1569895888000
001 1569895890000
001 1569895891000
001 1569895895000
001 1569895898000
001 1569895900000
001 1569895911000
001 1569895948000
001 1569895945000
001 1569895947000
001 1569895950000
001 1569895960000
001 1569895949000

三,运行

2022-05-07 23_54_03-MessageCenterUI.png

相关文章

网友评论

      本文标题:使用flink的watermark解决数据乱序和延迟

      本文链接:https://www.haomeiwen.com/subject/cnufurtx.html