美文网首页
flink join基础demo代码

flink join基础demo代码

作者: DuLaGong | 来源:发表于2019-05-16 20:33 被阅读0次

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

import org.apache.flink.api.scala._

object TransformationApp {

def main(args: Array[String]):Unit = {

val env=ExecutionEnvironment.getExecutionEnvironment

    val data1=new ListBuffer[(Int,String)]()

data1.append((1,"weiwei"))

data1.append((2,"法法"))

data1.append((3,"gogo"))

val data2=ListBuffer[(Int,String)]()

    data2.append((1,"北京"))

data2.append((2,"上海"))

data2.append((4,"南京"))

//    innerJoinFunction(data1,data2,env)

//    leftOutJoinFunction(data1,data2,env)

//    fullOutJoinFunction(data1,data2,env)

    crossJoinFunction(env)

}

def innerJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={

val left=env.fromCollection(data1)

val right=env.fromCollection(data2)

left.join(right)

.where(0)//左边的key

      .equalTo(0)//右边的key

      .apply(

(left,right) => {

(left._1,left._2,right._2)//输出的结果

        }

).print()

}

def leftOutJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={

val left=env.fromCollection(data1)

val right=env.fromCollection(data2)

left.leftOuterJoin(right)

.where(0)

.equalTo(0)

.apply(

(left,right)=>{

if(right ==null){

(left._1,left._2,"-")

}else{

(left._1,left._2,right._2)

}

//          (left._1,left._2,right._2)//Caused by: java.lang.NullPointerException,左关联会有字段为空

        }

).print()

}

def fullOutJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={

val left=env.fromCollection(data1)

val right=env.fromCollection(data2)

left.fullOuterJoin(right)

.where(0)

.equalTo(0)

.apply(

(left,right)=>{

if(left==null){

(right._1,"-",right._2)

}else if(right==null){

(left._1,left._2,"=")

}else{

(left._1,left._2,right._2)

}

}

).print()

}

/**

* 笛卡尔积

    * @param env

    */

  def crossJoinFunction(env:ExecutionEnvironment)={

val ds1=List("004","003")

val ds2=List("001","002")

val left=env.fromCollection(ds1)

val right=env.fromCollection(ds2)

left.cross(right).print()

}

}

相关文章

网友评论

      本文标题:flink join基础demo代码

      本文链接:https://www.haomeiwen.com/subject/fpmwaqtx.html