美文网首页
flink中的interval join机制实现

flink中的interval join机制实现

作者: 万州客 | 来源:发表于2022-05-16 15:22 被阅读0次

flink合并kafka多个topic数据流的时间窗口,有些实用,但用flink SQL应该更简单,用代码,便于了解机制原理。

一,代码

package org.bbk.flink


import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerConfig

case class Order2(orderId: String, userId:String, gdsId: String, amount: Double, addressId:String, time:Long)
case class Address(addressId: String, userId:String, address: String, time: Long)
case class RsInfo(orderId: String, userId: String, gdsId: String,
                  amount: Double, addressId: String, address: String)
object Demo {
  def main(args:Array[String]):Unit = {
    val envStream = StreamExecutionEnvironment.getExecutionEnvironment
    envStream.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    envStream.getConfig.setAutoWatermarkInterval(5000L)
    envStream.setParallelism(1)

    val kafkaConfig = new Properties()
    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.111:9092")
    //kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1")

    val orderConsumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema, kafkaConfig)
    val addressConsumer = new FlinkKafkaConsumer011[String]("topic2", new SimpleStringSchema, kafkaConfig)
    val orderStream = envStream.addSource(orderConsumer)
      .map(x => {
        val a = x.split(",")
        Order2(a(0), a(1), a(2), a(3).toDouble, a(4), a(5).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order2](Time.seconds(10)) {
      override def extractTimestamp(element: Order2): Long = element.time
      })
      .keyBy(_.addressId)

    val addressStream = envStream.addSource(addressConsumer)
      .map(x => {
        val a = x.split(",")
        Address(a(0), a(1), a(2), a(3).toLong)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Address](Time.seconds(10)) {
      override def extractTimestamp(element: Address): Long = element.time
      }).keyBy(_.addressId)

    orderStream.intervalJoin(addressStream)
      .between(Time.seconds(1), Time.seconds(5))
      .process(new ProcessJoinFunction[Order2, Address, RsInfo] {
        override def processElement(left: Order2, right: Address, ctx: ProcessJoinFunction[Order2, Address, RsInfo]#Context, out: Collector[RsInfo]): Unit = {
          println("===在这里得到相同key的两条数据===")
          println("left: " + left)
          println("right: " + right)
        }
      })

    envStream.execute()
  }
}

二,测试数据

 kafka-console-producer.sh --broker-list 192.168.1.111:9092 --topic topic1
 kafka-console-producer.sh --broker-list 192.168.1.111:9092 --topic topic2

order01,userId01,gds01,100,addressId01,1573054200000
addressId01,userId01,beijing,1573054203000
addressId01,userId01,beijing,1573054206000

三,输出

2022-05-10 08_59_54-192.168.1.111 - WindTerm.png 2022-05-10 08_59_31-MessageCenterUI.png

相关文章

网友评论

      本文标题:flink中的interval join机制实现

      本文链接:https://www.haomeiwen.com/subject/ssflurtx.html