val spark = SparkSession
.builder()
.appName("Jointest")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val lines: Dataset[String] = spark.createDataset(List("1,laozhao,china","2,laoduan,Usa","3,yaoyang,jp"))
//对数据进行整理
val tpDs: Dataset[(Long,String,String)] = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val nation = fields(2)
(id, name, nation)
})
val df1 = tpDs.toDF("id ","name","nation")
val nations: Dataset[String] = spark.createDataset(List("china,中国","Usa,美国"))
//对数据进行整理
val nata = nations.map(l => {
val fie = l.split(",")
val ename = fie(0)
val cname = fie(1)
(ename, cname)
})
val df2=nata.toDF("ename","cname")
//第一种,创建视图
// df1.createTempView("v_users")
// df2.createTempView("v_nations")
// val n :DataFrame= spark.sql("SELECT name ,cname FROM v_users JOIN v_nations ON nation=ename")
// n.show()
val r = df1.join(df2,$"nation"===$"ename","left_outer")
r.show()
spark.stop()
//注释:Hive不支持非等值的join
在Spark-shell上执行的join操作 以SortMergeJoin执行的join操作
网友评论