package SparkSql
import org.apache.spark.sql.{DataFrame, SparkSession}
object JdbcDataSource {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]").getOrCreate()
val logs:DataFrame = spark.read.format("jdbc").options(
Map("url" ->"jdbc:mysql://localhost:3306/mysql",
"driver" ->"com.mysql.jdbc.Driver",
"dbtable" ->"help_category",
"user" ->"root",
"password" ->"root")
).load()
// val filter: Dataset[Row] = logs.filter(r => {
// r.getAs[Int](2) <=34
// })
// filter.show()
//lambda表达式
import spark.implicits._
val lo = logs.filter($"parent_category_id" <=34)
// val lop=logs.where($"parent_category_id"<=34)
//查询
val res = lo.select($"help_category_id",$"name",$"parent_category_id" *10 as"new_age")
//将查询的结果设置成一张新表
val props=new Properties()
props.put("user","root")
props.put("password","root")
//ignore :当该表已经存在时既不对其增加操作也不执行覆盖操作
res.write.mode("ignore").jdbc("jdbc:mysql://localhost:3306/mysql","logs1",props)
//将文件保存到本地
//只能保存一列,且该列必须是字符串类型
// res.write.text("D:\\out\\sql")
//将文件保存到本地且为json格式
//{"help_category_id":1,"name":"Geographic","new age":0}
// res.write.json("D:\\out\\sql")
//将文件保存到本地且为xls工作表格式,不记录列名和表头
// res.write.csv("D:\\out\\sql")
res.write.parquet("D:\\out\\sql")
spark.stop()
}
}
网友评论