时态表

作者: 知识海洋中的淡水鱼 | 来源:发表于2020-03-28 15:47 被阅读0次

    TemporalTable

    关于时态表的介绍可以看看flink中文社区的这篇文章Flink SQL 如何实现数据流的 Join?还有该篇博文Flink Table & SQL 时态表Temporal Table
    append表(追加表)关联时态表数据,进行流join操作(时态表可以减少时态表中保存的状态)

    import java.util.Properties
    
    import com.alibaba.fastjson.{JSON, JSONObject}
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.common.time.Time
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api.scala._
    import org.apache.flink.table.functions.TemporalTableFunction
    
    object FlinkTemporalTable {
      def main(args: Array[String]): Unit = {
        // 获取流处理执行坏境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 通过流处理执行引擎构建表执行引擎
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    
        val props = new Properties()
        props.setProperty("bootstrap.servers", "xxx.xxx.xxx.xxx:9092")
        props.setProperty("auto.offset.reset", "latest") // 设置消费起点 earliest,latest
        props.setProperty("group.id", "local_consumer")
    
        val foTopic = "order"
        val fcdv2Topic = "deal"
    
        // 获取到原始数据
        val foOriginalStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](foTopic, new SimpleStringSchema(), props))
        // {"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
        val foStream: DataStream[(String, String, Long, Long)] = foOriginalStream
          // 解析原始数据(json格式)
          .map { json =>
            val jsonObj: JSONObject = JSON.parseObject(json)
            (jsonObj.getString("order_id"), jsonObj.getString("order_no"), jsonObj.getLongValue("date_create"), jsonObj.getLongValue("date_update"))
          }
          .assignAscendingTimestamps(tp => tp._4) // 设置时间水位线
    
        // {"car_deal_id":"car_01","sale_order_no":"0001","car_attribute_id":"A0001","vin":"a1000","date_create":"1584497993601","date_update":"1584497993601"}
        val fcdv2OriginalStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](fcdv2Topic, new SimpleStringSchema(), props))
        val fcdv2Stream: DataStream[(String, String, String, String, Long, Long)] = fcdv2OriginalStream
          .map { json =>
            val jsonObj: JSONObject = JSON.parseObject(json)
            (jsonObj.getString("car_deal_id"), jsonObj.getString("sale_order_no"), jsonObj.getString("car_attribute_id"), jsonObj.getString("vin"), jsonObj.getLongValue("date_create"), jsonObj.getLongValue("date_update"))
          }
          .assignAscendingTimestamps(tp => tp._6) // 设置时间水位线
    
        // 注册成表
        tableEnv.registerDataStream("fo", foStream, 'order_id, 'order_no, 'date_create, 'date_update, 'foRowtime.rowtime)
        tableEnv.registerDataStream("fcdv2", fcdv2Stream, 'car_deal_id, 'sale_order_no, 'car_attribute_id, 'vin, 'date_create, 'date_update, 'fcdv2Rowtime.rowtime)
    
        // 设置Temporal Table的时间属性和主键
        val fcdv2TemporalFunction: TemporalTableFunction = tableEnv.scan("fcdv2").createTemporalTableFunction("fcdv2Rowtime", "sale_order_no");
        //注册TableFunction
        tableEnv.registerFunction("FCDV2_TEMPORAL_FUNCTION", fcdv2TemporalFunction)
    
        // 运行SQL
        val sql =
          """
            |SELECT fo.`order_no`            AS `order_no`
            |     , fo.`date_create`         AS `fo_date_create`
            |     , fo.`date_update`         AS `fo_date_update`
            |     , fcdv2.`car_attribute_id` AS `fcdv2_car_attribute_id`
            |     , fcdv2.`vin`              AS `fcdv2_vin`
            |     , fcdv2.`date_create`      AS `fcdv2_date_create`
            |     , fcdv2.`date_update`      AS `fcdv2_date_update`
            |FROM fo
            |    , LATERAL TABLE(FCDV2_TEMPORAL_FUNCTION(fo.foRowtime)) as fcdv2
            |WHERE fo.order_no = fcdv2.sale_order_no
            |""".stripMargin
    
    
        val table = tableEnv.sqlQuery(sql)
    
        tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1L), Time.minutes(7L)) // 结合使用状态清理
        tableEnv.toAppendStream[(String, Long, Long, String, String, Long, Long)](table) // 添加流
          .print()
    
        //6、开始执行
        tableEnv.execute(FlinkTemporalTable.getClass.getSimpleName)
      }
    }
    

    input

    -- order
    {"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
    {"order_id":"order_02","order_no":"0002","date_create":"1584497994602","date_update":"1584497994602"}
    {"order_id":"order_03","order_no":"0003","date_create":"1584497995603","date_update":"1584497995603"}
    {"order_id":"order_04","order_no":"0004","date_create":"1584497997604","date_update":"1584497997604"}
    {"order_id":"order_05","order_no":"0005","date_create":"1584497998605","date_update":"1584497998605"}
    {"order_id":"order_06","order_no":"0006","date_create":"1584497998606","date_update":"1584497998606"}
    {"order_id":"order_07","order_no":"0007","date_create":"1584497998607","date_update":"1584497999899"}
    
    -- deal
    {"car_deal_id":"car_01","sale_order_no":"0001","car_attribute_id":"A0001","vin":"a1000","date_create":"1584497993601","date_update":"1584497993601"}
    {"car_deal_id":"car_02","sale_order_no":"0002","car_attribute_id":"A0002","vin":"b1010","date_create":"1584497994602","date_update":"1584497994602"}
    {"car_deal_id":"car_03","sale_order_no":"0003","car_attribute_id":"A0003","vin":"c1011","date_create":"1584497995603","date_update":"1584497995603"}
    {"car_deal_id":"car_04","sale_order_no":"0004","car_attribute_id":"A0004","vin":"d1110","date_create":"1584497997604","date_update":"1584497997604"}
    {"car_deal_id":"car_05","sale_order_no":"0005","car_attribute_id":"A0005","vin":"e1111","date_create":"1584497998605","date_update":"1584497998605"}
    {"car_deal_id":"car_05","sale_order_no":"0005","car_attribute_id":"A0005","vin":"f000","date_create":"1584497998605","date_update":"1584497999788"}
    {"car_deal_id":"car_04","sale_order_no":"0004","car_attribute_id":"A0004","vin":"d0001","date_create":"1584497997604","date_update":"1584497999799"}
    

    output

    注意:此处使用的时间语义为event_time,也就是wartmark来驱动表的join。且append表只能够join上时间戳小于等于自己的时态表数据

    temporal_table.png

    相关文章

      网友评论

          本文标题:时态表

          本文链接:https://www.haomeiwen.com/subject/bkfsuhtx.html