1 定义数据库连接
Class.forName("com.mysql.jdbc.Driver").newInstance()
val DB_URL_R = "jdbc:mysql://10.1.11.18/medical_waste?useSSL=false&characterEncoding=utf8"
val PROPERTIES = new java.util.Properties()
PROPERTIES.setProperty("user", "********")
PROPERTIES.setProperty("password", "********")
2 读取两个表
val rfidCardDF = spark.read.jdbc(DB_URL_R, "t_rfid_card", Array("card_type = 22"), PROPERTIES).select("card_id","card_label").cache()
val medicalWasteDF = spark.read.jdbc(DB_URL_R, "t_medical_waste", Array("YEAR(rec_ts) = 2017"), PROPERTIES).select("team_id","mw_weight").cache()
3 连接
val df2 = medicalWasteDF.join(rfidCardDF,medicalWasteDF("team_id") equalTo(rfidCardDF("card_id"))).drop("card_id").cache()
使用join,默认是left out join。条件判断是相等。然后删除掉一个重复的列card_id。
4 统计
df2.groupBy("card_label").agg(sum("mw_weight")).orderBy(col("sum(mw_weight)").desc).show()
group by操作,生成一个新的数据集,增加了一列sum操作,生成一个默认列名sum(mw_weight)的列,然后倒序排个序。
网友评论