美文网首页
flink广播变量案例

flink广播变量案例

作者: 万州客 | 来源:发表于2022-05-13 08:04 被阅读0次

    有些实用性质了,书上的代码关联数据还没有下载下来,等下载下来再调试。

    求取订单对应的商品,将订单和商品数据合并成一条数据

    一,代码

    package org.bbk.flink
    
    import java.util
    
    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration
    
    import scala.collection.mutable
    
    object Demo {
      def main(args:Array[String]):Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        //获取订单数据
        val productData: DataSet[String] = env.readTextFile("file:///D:\\tmp" +
          "" +
          "\\adult.txt")
        val productMap = new mutable.HashMap[String, String]()
        val productMapSet: DataSet[mutable.HashMap[String, String]] = productData
          .map(x => {
            val strings: Array[String] = x.split(",")
            productMap.put(strings(0), x)
            productMap
          })
        //获取商品数据
        val orderDataSet: DataSet[String] = env.readTextFile("file:///D:\\tmp" +
          "" +
          "\\count.txt")
        val resultLine: DataSet[String] = orderDataSet.map(new RichMapFunction[String, String] {
          var listData: util.List[Map[String, String]] = null
          var allMap = Map[String, String]()
    
          override def open(parameters: Configuration): Unit = {
            this.listData = getRuntimeContext.getBroadcastVariable[Map[String,String]]("productBroadCast")
            val listResult: util.Iterator[Map[String, String]] = listData.iterator()
            while (listResult.hasNext) {
              allMap = allMap.++(listResult.next())
            }
          }
          override def map(eachOrder: String): String = {
            val str: String = allMap.getOrElse(eachOrder.split(",")(2), "暂时没有值")
            eachOrder + "," + str
          }
        }).withBroadcastSet(productMapSet, "productBroadCast")
        resultLine.print()
        env.execute()
      }
    }
    
    
    
    
    

    二,测试数据

    三,输出

    相关文章

      网友评论

          本文标题:flink广播变量案例

          本文链接:https://www.haomeiwen.com/subject/poncurtx.html