flink合并kafka多个topic数据流的时间窗口,有些实用,但用flink SQL应该更简单,用代码,便于了解机制原理。
一,代码
package org.bbk.flink
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerConfig
case class Order2(orderId: String, userId:String, gdsId: String, amount: Double, addressId:String, time:Long)
case class Address(addressId: String, userId:String, address: String, time: Long)
case class RsInfo(orderId: String, userId: String, gdsId: String,
amount: Double, addressId: String, address: String)
object Demo {
def main(args:Array[String]):Unit = {
val envStream = StreamExecutionEnvironment.getExecutionEnvironment
envStream.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
envStream.getConfig.setAutoWatermarkInterval(5000L)
envStream.setParallelism(1)
val kafkaConfig = new Properties()
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.111:9092")
//kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1")
val orderConsumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema, kafkaConfig)
val addressConsumer = new FlinkKafkaConsumer011[String]("topic2", new SimpleStringSchema, kafkaConfig)
val orderStream = envStream.addSource(orderConsumer)
.map(x => {
val a = x.split(",")
Order2(a(0), a(1), a(2), a(3).toDouble, a(4), a(5).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order2](Time.seconds(10)) {
override def extractTimestamp(element: Order2): Long = element.time
})
.keyBy(_.addressId)
val addressStream = envStream.addSource(addressConsumer)
.map(x => {
val a = x.split(",")
Address(a(0), a(1), a(2), a(3).toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Address](Time.seconds(10)) {
override def extractTimestamp(element: Address): Long = element.time
}).keyBy(_.addressId)
orderStream.intervalJoin(addressStream)
.between(Time.seconds(1), Time.seconds(5))
.process(new ProcessJoinFunction[Order2, Address, RsInfo] {
override def processElement(left: Order2, right: Address, ctx: ProcessJoinFunction[Order2, Address, RsInfo]#Context, out: Collector[RsInfo]): Unit = {
println("===在这里得到相同key的两条数据===")
println("left: " + left)
println("right: " + right)
}
})
envStream.execute()
}
}
二,测试数据
kafka-console-producer.sh --broker-list 192.168.1.111:9092 --topic topic1
kafka-console-producer.sh --broker-list 192.168.1.111:9092 --topic topic2
order01,userId01,gds01,100,addressId01,1573054200000
addressId01,userId01,beijing,1573054203000
addressId01,userId01,beijing,1573054206000
三,输出


网友评论