一、问题引入
val realResultRDD: RDD[CategoryTop10SessionTop10] = resultMapRDD.flatMap(list=>list)
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://linux1:3306/sparkmall-190311"
val userName = "root"
val passWd = "000000"
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(url, userName, passWd)
val sql = "insert into category_top10_session_count ( taskId, categoryId, sessionId, clickCount) values (?, ?, ?, ?)"
val statement: PreparedStatement = connection.prepareStatement(sql)
realResultRDD.foreach{
obj=>{
statement.setObject(1, obj.taskId)
statement.setObject(2, obj.categoryId)
statement.setObject(3, obj.sessionId)
statement.setObject(4, obj.clickCount)
statement.executeUpdate()
}
}
statement.close()
connection.close()
写入时出现Task not serializable错误,原因是,因为这里调用的是SparkRDD的算子,执行在Excutor端,所以jdbc的创建需要在Excutor端 。
PrepareStatement对象,这个对象无法序列化,而传入map中的对象是需要分布式传送到各个节点Excutor上,传送前先序列化,到达相应节点Excutor上后再反序列化。PrepareStatement是个Java类,如果一个java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口,对象PrepareStatement在driver端,realResultRDD调用foreach算子后在Excutor端执行,PrepareStatement对象无法序列化,报错。
为什么对象PrepareStatement无法序列化,如下图所示,会出现数据库安全问题。
data:image/s3,"s3://crabby-images/fd18f/fd18f42512b54295295a7eb0a413d2e916afe3fa" alt=""
二、问题解决
将preparedStatement在Excutor端创建。
val realResultRDD: RDD[CategoryTop10SessionTop10] = resultMapRDD.flatMap(list=>list)
realResultRDD.foreachPartition(datas => {
/** 因为这里调用的是SparkRDD的算子,执行在Excutor端,所以jdbc的创建需要在Excutor端 */
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/sparkmall190311"
val userName = "root"
val passWd = "111111"
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(url,userName,passWd)
val sql = "insert into category_top10_session_count(taskId, categoryId, sessionId, clickCount) values (?,?,?,?)"
val preparedStatement: PreparedStatement = connection.prepareStatement(sql)
datas.foreach{
obj => {
preparedStatement.setObject(1,obj.taskId)
preparedStatement.setObject(1,obj.categoryId)
preparedStatement.setObject(1,obj.sessionId)
preparedStatement.setObject(1,obj.clickCount)
preparedStatement.executeUpdate()
}
}
preparedStatement.close()
connection.close()
})
网友评论