有些实用性质了,书上的代码关联数据还没有下载下来,等下载下来再调试。
求取订单对应的商品,将订单和商品数据合并成一条数据
一,代码
package org.bbk.flink
import java.util
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable
object Demo {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//获取订单数据
val productData: DataSet[String] = env.readTextFile("file:///D:\\tmp" +
"" +
"\\adult.txt")
val productMap = new mutable.HashMap[String, String]()
val productMapSet: DataSet[mutable.HashMap[String, String]] = productData
.map(x => {
val strings: Array[String] = x.split(",")
productMap.put(strings(0), x)
productMap
})
//获取商品数据
val orderDataSet: DataSet[String] = env.readTextFile("file:///D:\\tmp" +
"" +
"\\count.txt")
val resultLine: DataSet[String] = orderDataSet.map(new RichMapFunction[String, String] {
var listData: util.List[Map[String, String]] = null
var allMap = Map[String, String]()
override def open(parameters: Configuration): Unit = {
this.listData = getRuntimeContext.getBroadcastVariable[Map[String,String]]("productBroadCast")
val listResult: util.Iterator[Map[String, String]] = listData.iterator()
while (listResult.hasNext) {
allMap = allMap.++(listResult.next())
}
}
override def map(eachOrder: String): String = {
val str: String = allMap.getOrElse(eachOrder.split(",")(2), "暂时没有值")
eachOrder + "," + str
}
}).withBroadcastSet(productMapSet, "productBroadCast")
resultLine.print()
env.execute()
}
}
网友评论