先说说spark 2.1的SparkSession,原来的SparkContext已经并入SparkSession,所以需要这样开始:
import org.apache.spark.sql.SparkSession
val sc = SparkSession
.builder()
.appName("reas csv to label data ")
.getOrCreate()
下面是连接数据库,并把rdd写入mysql的语句:
val url="jdbc:mysql://192.168.0.28:3306/database"
val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("user", jdbcUsername)
prop.setProperty("password", jdbcPassword)
val cols = "ip"::"source"::"hour"::"count"::Nil
val df = data.toDF(cols:_*)
df.write.mode("append").jdbc(url, "table", prop)
这里介绍下toDF,函数的参数是一个(colNames:String),叫变参,而我传入的cols是个String的list,list通过cols:_转为变参,你学会了吗?而这个是你数据库的fields,如果不告诉toDF列名,那么编号从_1,_2,…,_N开始,就会出现Unkonwn column '_1' in 'field list'的错误:
网友评论