美文网首页
Spark的join使用

Spark的join使用

作者: 不愿透露姓名的李某某 | 来源:发表于2019-07-15 01:07 被阅读0次

    val spark = SparkSession

    .builder()

    .appName("Jointest")

    .master("local[*]")

    .getOrCreate()

    import spark.implicits._

    val lines: Dataset[String] = spark.createDataset(List("1,laozhao,china","2,laoduan,Usa","3,yaoyang,jp"))

    //对数据进行整理

        val tpDs: Dataset[(Long,String,String)] = lines.map(line => {

    val fields = line.split(",")

    val id = fields(0).toLong

    val name = fields(1)

    val nation = fields(2)

    (id, name, nation)

    })

    val df1 = tpDs.toDF("id ","name","nation")

    val nations: Dataset[String] = spark.createDataset(List("china,中国","Usa,美国"))

    //对数据进行整理

        val nata = nations.map(l => {

    val fie = l.split(",")

    val ename = fie(0)

    val cname = fie(1)

    (ename, cname)

    })

    val df2=nata.toDF("ename","cname")

    //第一种,创建视图

    //    df1.createTempView("v_users")

    //    df2.createTempView("v_nations")

    //  val n :DataFrame= spark.sql("SELECT name ,cname FROM v_users JOIN v_nations ON nation=ename")

    //    n.show()

        val r = df1.join(df2,$"nation"===$"ename","left_outer")

    r.show()

    spark.stop()

    //注释:Hive不支持非等值的join

    在Spark-shell上执行的join操作 以SortMergeJoin执行的join操作

    相关文章

      网友评论

          本文标题:Spark的join使用

          本文链接:https://www.haomeiwen.com/subject/fwgwkctx.html