美文网首页
flink join基础demo代码

flink join基础demo代码

作者: DuLaGong | 来源:发表于2019-05-16 20:33 被阅读0次

    import org.apache.flink.api.scala.ExecutionEnvironment

    import scala.collection.mutable.ListBuffer

    import org.apache.flink.api.scala._

    object TransformationApp {

    def main(args: Array[String]):Unit = {

    val env=ExecutionEnvironment.getExecutionEnvironment

        val data1=new ListBuffer[(Int,String)]()

    data1.append((1,"weiwei"))

    data1.append((2,"法法"))

    data1.append((3,"gogo"))

    val data2=ListBuffer[(Int,String)]()

        data2.append((1,"北京"))

    data2.append((2,"上海"))

    data2.append((4,"南京"))

    //    innerJoinFunction(data1,data2,env)

    //    leftOutJoinFunction(data1,data2,env)

    //    fullOutJoinFunction(data1,data2,env)

        crossJoinFunction(env)

    }

    def innerJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={

    val left=env.fromCollection(data1)

    val right=env.fromCollection(data2)

    left.join(right)

    .where(0)//左边的key

          .equalTo(0)//右边的key

          .apply(

    (left,right) => {

    (left._1,left._2,right._2)//输出的结果

            }

    ).print()

    }

    def leftOutJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={

    val left=env.fromCollection(data1)

    val right=env.fromCollection(data2)

    left.leftOuterJoin(right)

    .where(0)

    .equalTo(0)

    .apply(

    (left,right)=>{

    if(right ==null){

    (left._1,left._2,"-")

    }else{

    (left._1,left._2,right._2)

    }

    //          (left._1,left._2,right._2)//Caused by: java.lang.NullPointerException,左关联会有字段为空

            }

    ).print()

    }

    def fullOutJoinFunction(data1: ListBuffer[(Int,String)],data2:ListBuffer[(Int,String)],env:ExecutionEnvironment)={

    val left=env.fromCollection(data1)

    val right=env.fromCollection(data2)

    left.fullOuterJoin(right)

    .where(0)

    .equalTo(0)

    .apply(

    (left,right)=>{

    if(left==null){

    (right._1,"-",right._2)

    }else if(right==null){

    (left._1,left._2,"=")

    }else{

    (left._1,left._2,right._2)

    }

    }

    ).print()

    }

    /**

    * 笛卡尔积

        * @param env

        */

      def crossJoinFunction(env:ExecutionEnvironment)={

    val ds1=List("004","003")

    val ds2=List("001","002")

    val left=env.fromCollection(ds1)

    val right=env.fromCollection(ds2)

    left.cross(right).print()

    }

    }

    相关文章

      网友评论

          本文标题:flink join基础demo代码

          本文链接:https://www.haomeiwen.com/subject/fpmwaqtx.html