美文网首页
5.2 写入Mysql时出现的问题(Task not seria

5.2 写入Mysql时出现的问题(Task not seria

作者: yayooo | 来源:发表于2019-08-04 15:48 被阅读0次

一、问题引入

  val realResultRDD: RDD[CategoryTop10SessionTop10] = resultMapRDD.flatMap(list=>list)
   val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://linux1:3306/sparkmall-190311"
        val userName = "root"
        val passWd = "000000"

        Class.forName(driver)
        val connection: Connection = DriverManager.getConnection(url, userName, passWd)
        val sql = "insert into category_top10_session_count ( taskId, categoryId, sessionId, clickCount) values (?, ?, ?, ?)"
        val statement: PreparedStatement = connection.prepareStatement(sql)

        realResultRDD.foreach{
            obj=>{
                statement.setObject(1, obj.taskId)
                statement.setObject(2, obj.categoryId)
                statement.setObject(3, obj.sessionId)
                statement.setObject(4, obj.clickCount)
                statement.executeUpdate()
            }
        }
        statement.close()
        connection.close()

写入时出现Task not serializable错误,原因是,因为这里调用的是SparkRDD的算子,执行在Excutor端,所以jdbc的创建需要在Excutor端 。
PrepareStatement对象,这个对象无法序列化,而传入map中的对象是需要分布式传送到各个节点Excutor上,传送前先序列化,到达相应节点Excutor上后再反序列化。PrepareStatement是个Java类,如果一个java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口,对象PrepareStatement在driver端,realResultRDD调用foreach算子后在Excutor端执行,PrepareStatement对象无法序列化,报错。
为什么对象PrepareStatement无法序列化,如下图所示,会出现数据库安全问题。


二、问题解决
将preparedStatement在Excutor端创建。

val realResultRDD: RDD[CategoryTop10SessionTop10] = resultMapRDD.flatMap(list=>list)
realResultRDD.foreachPartition(datas => {
      /** 因为这里调用的是SparkRDD的算子,执行在Excutor端,所以jdbc的创建需要在Excutor端 */
      val driver = "com.mysql.jdbc.Driver"
      val url = "jdbc:mysql://hadoop102:3306/sparkmall190311"
      val userName = "root"
      val passWd = "111111"

      Class.forName(driver)
      val connection: Connection = DriverManager.getConnection(url,userName,passWd)
      val sql = "insert into category_top10_session_count(taskId, categoryId, sessionId, clickCount) values (?,?,?,?)"
      val preparedStatement: PreparedStatement = connection.prepareStatement(sql)


      datas.foreach{
        obj => {
          preparedStatement.setObject(1,obj.taskId)
          preparedStatement.setObject(1,obj.categoryId)
          preparedStatement.setObject(1,obj.sessionId)
          preparedStatement.setObject(1,obj.clickCount)
          preparedStatement.executeUpdate()
        }
      }

      preparedStatement.close()
      connection.close()
    })

相关文章

网友评论

      本文标题:5.2 写入Mysql时出现的问题(Task not seria

      本文链接:https://www.haomeiwen.com/subject/jgkydctx.html