有些高级了,我也就能把代码弄起来运行,具体细节,还有猜的成本~
一,代码
demo.scala
package org.bbk.flink
import java.text.SimpleDateFormat
import java.{lang, util}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction}
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, OutputTag, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector
case class CountWord(word: String, count: Long)
object Demo {
def main(args:Array[String]):Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//environment.enableCheckpointing(1000)
//environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//environment.getCheckpointConfig.setCheckpointTimeout(6000)
//environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//environment.setStateBackend(new MemoryStateBackend())
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.setParallelism(1)
val sourceStream = environment.socketTextStream("192.168.1.111", 9000)
val inputMap: DataStream[(String, Long)] = sourceStream.map(line => {
val arr = line.split(" ")
(arr(0), arr(1).toLong)
})
val waterMarkStream: DataStream[(String, Long)] = inputMap
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, Long)] {
var currentMaxTimestamp = 0L
var watermarkDiff: Long = 10000L
val sdf = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss.SSS")
override def checkAndGetNextWatermark(t:(String, Long), l:Long): Watermark = {
val watermark = new Watermark(currentMaxTimestamp - watermarkDiff)
watermark
}
override def extractTimestamp(element: (String, Long), l: Long): Long = {
val eventTime = element._2
currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)
val id = Thread.currentThread().getId
println("currentThreadId: " + id +
",key: " + element._1 +
", eventTime: [" + element._2 + "|" + sdf.format(element._2) +
"],currentMaxTimestamp: [" + currentMaxTimestamp + "|" +sdf.format(currentMaxTimestamp) +
"], watermark: [" + this.checkAndGetNextWatermark(element, l).getTimestamp + "|" + sdf.format(this.checkAndGetNextWatermark(element, l).getTimestamp) + "]")
eventTime
}
})
val outputTag: OutputTag[(String, Long)] = new OutputTag[(String, Long)]("late_data")
val outputWindow: DataStream[String] = waterMarkStream
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.sideOutputLateData(outputTag)
.apply(new MyWindowFunction)
val sideOutput: DataStream[(String, Long)] = outputWindow.getSideOutput((outputTag))
sideOutput.print()
outputWindow.print()
environment.execute()
}
}
MyWindowFunction.scala
package org.bbk.flink
import scala.util.Sorting
import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
class MyWindowFunction extends WindowFunction[(String, Long), String, Tuple, TimeWindow]{
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
val arrBuf = ArrayBuffer[Long]()
val ite = input.iterator
while (ite.hasNext) {
val tup2 = ite.next()
arrBuf.append(tup2._2)
}
val arr = arrBuf.toArray
Sorting.quickSort(arr)
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val result = "聚合数据的key为:" + keyStr +
",窗口中数据的条数为:" + arr.length +
",窗口中第一条数据为:" + sdf.format(arr.head) +
", 窗口中最后一条数据为:" + sdf.format(arr.last) +
",窗口起始时间为:" + sdf.format(window.getStart) +
",窗口结束时间为:" + sdf.format(window.getEnd) +
"!!!看到这个结果,就证明窗口已经运行了"
out.collect((result))
}
}
二,测试数据
001 1569895882000
001 1569895885000
001 1569895888000
001 1569895890000
001 1569895891000
001 1569895895000
001 1569895898000
001 1569895900000
001 1569895911000
001 1569895948000
001 1569895945000
001 1569895947000
001 1569895950000
001 1569895960000
001 1569895949000
三,运行

网友评论