import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
import org.apache.flink.api.scala._
object TransformationApp {
def main(args: Array[String]):Unit = {
val env=ExecutionEnvironment.getExecutionEnvironment
val data1=new ListBuffer[(Int,String)]()
data1.append((1,"weiwei"))
data1.append((2,"法法"))
data1.append((3,"gogo"))
val data2=ListBuffer[(Int,String)]()
data2.append((1,"北京"))
data2.append((2,"上海"))
data2.append((4,"南京"))
// innerJoinFunction(data1,data2,env)
// leftOutJoinFunction(data1,data2,env)
// fullOutJoinFunction(data1,data2,env)
crossJoinFunction(env)
}
def innerJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={
val left=env.fromCollection(data1)
val right=env.fromCollection(data2)
left.join(right)
.where(0)//左边的key
.equalTo(0)//右边的key
.apply(
(left,right) => {
(left._1,left._2,right._2)//输出的结果
}
).print()
}
def leftOutJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={
val left=env.fromCollection(data1)
val right=env.fromCollection(data2)
left.leftOuterJoin(right)
.where(0)
.equalTo(0)
.apply(
(left,right)=>{
if(right ==null){
(left._1,left._2,"-")
}else{
(left._1,left._2,right._2)
}
// (left._1,left._2,right._2)//Caused by: java.lang.NullPointerException,左关联会有字段为空
}
).print()
}
def fullOutJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={
val left=env.fromCollection(data1)
val right=env.fromCollection(data2)
left.fullOuterJoin(right)
.where(0)
.equalTo(0)
.apply(
(left,right)=>{
if(left==null){
(right._1,"-",right._2)
}else if(right==null){
(left._1,left._2,"=")
}else{
(left._1,left._2,right._2)
}
}
).print()
}
/**
* 笛卡尔积
* @param env
*/
def crossJoinFunction(env:ExecutionEnvironment)={
val ds1=List("004","003")
val ds2=List("001","002")
val left=env.fromCollection(ds1)
val right=env.fromCollection(ds2)
left.cross(right).print()
}
}
网友评论