美文网首页
Flink维表关联系列2-全量加载方式

Flink维表关联系列2-全量加载方式

作者: LZhan | 来源:发表于2019-11-06 13:16 被阅读0次

    最近看了公众号[Flink 实战剖析]的部分文章,觉得其中维表关联系列的文章总结得挺全面,因此做一次搬运工,并进行一些总结。

    1.前言

    在维表关联时,定时全量加载的方式适用于:
    维表数据量较少并且业务对于维表数据变化的敏感程度较低
    注意点:
    <1> 全量加载有可能会比较耗时,所以必须是一个异步加载过程
    <2> 内存维表数据需要被流表数据关联读取、也需要被定时重新加载,这两个过程是不同线程执行,为了尽可能保证数据一致性,可使用原子引用变量包装内存维表数据对象即AtomicReference
    <3> 查内存维表数据非异步io过程

    2.代码示例
    class SideFlatMapFunction extends RichFlatMapFunction[AdData, AdData] {
    
      private var sideInfo: AtomicReference[java.util.Map[Int, Int]] = _
    
    
      override def open(parameters: Configuration): Unit = {
    
        sideInfo = new AtomicReference[java.util.Map[Int, Int]]()
    
        sideInfo.set(loadData)
    
        val executors=Executors.newSingleThreadScheduledExecutor()
    
        executors.scheduleAtFixedRate(new Runnable {
    
          override def run(): Unit = reload()
    
        },5,5, TimeUnit.MINUTES)
    
      }
    
    
      override def flatMap(value: AdData, out: Collector[AdData]): Unit = {
    
        val tid=value.tId
    
        val aid=sideInfo.get().get(tid)
    
        var newV=AdData(aid,value.tId,value.clientId,value.actionType,value.time)
    
        out.collect(newV)
    
      }
    
      def reload()={
    
        try{
    
          println("do reload~")
    
          val newData=loadData()
    
          sideInfo.set(newData)
    
          println("reload ok~")
    
        }catch {
    
          case e:Exception=>{
    
            e.printStackTrace()
    
          }
    
        }
    
      }
    
      //连接数据库,查询出广告位和广告主的关系
      def loadData(): util.Map[Int, Int] = {
    
        val data = new util.HashMap[Int, Int]()
    
        Class.forName("com.mysql.jdbc.Driver")
    
        val con = DriverManager.getConnection("jdbc:mysql://localhost:3306/paul", "root", "123456")
    
        val sql = "select aid,tid from ads"
    
        val statement = con.prepareStatement(sql)
    
        val rs = statement.executeQuery()
    
        while (rs.next()) {
    
          val aid = rs.getInt("aid")
    
          val tid = rs.getInt("tid")
    
          data.put(tid, aid)
    
        }
        con.close()
        data
      }
    
    }
    

    相关文章

      网友评论

          本文标题:Flink维表关联系列2-全量加载方式

          本文链接:https://www.haomeiwen.com/subject/qjlfbctx.html