美文网首页
广播变量

广播变量

作者: 焉知非鱼 | 来源:发表于2019-07-10 14:35 被阅读0次

    从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。

      class GacXs6Offline @Inject()( sparkConf            : SparkConfiguration,
                                   mysqlConf            : MysqlConfiguration,
                                   hbaseConf            : HbaseConfiguration,
                                   sparkContext         : EnterpriseSparkContext[SparkContext],
                                   source               : NationDStream[(String,NaSourceData)],
                                   naDriveTrip          : NaDriveTrip
                                 ) extends Serializable {
        val naDriveTripDS = naDriveTrip.exteact(source)
        saveNaDriveTrip("drive_trip", naDriveTrip)
      }
      
      // serializable error, because the use of hbaseConf
      def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
        naDriveTrip.foreachRDD(rdd => {
          val conf = HBaseConfiguration.create()
          val jobConf = new JobConf(conf)
          jobConf.set("hbase.zookeeper.quorum", hbaseConf.hbaseUrl)
          jobConf.set("zookeeper.znode.parent", "/hbase")
          jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
          rdd.map(x => {
            (new ImmutableBytesWritable,  (new NaHbaseDao).putNaTripData(x._1, x._2))
          }).saveAsHadoopDataset(jobConf)
        })
      }
    
        // serializable ok,  because we create a new hbaseConf
        def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
        naDriveTrip.foreachRDD(rdd => {
          val conf = HBaseConfiguration.create()
          val jobConf = new JobConf(conf)
          val naHbaseConf =  new HbaseConfiguration
          jobConf.set("hbase.zookeeper.quorum", naHbaseConf.hbaseUrl)
          jobConf.set("zookeeper.znode.parent", "/hbase")
          jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
          jobConf.setOutputFormat(classOf[TableOutputFormat])
    
          rdd.map(x => {
            (new ImmutableBytesWritable,  (new NaHbaseDao).putNaTripData(x._1, x._2))
          }).saveAsHadoopDataset(jobConf)
        })
      }
    
      // serializable ok, because we create a new hbaseConf
      def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
        naDriveTrip.foreachRDD(rdd => {
          rdd.foreachPartition(partitionRdd => {
            val hbaseConf = new HbaseConfiguration
            val hbase = new HbaseUtil(hbaseConf)
            val connection = hbase.getHbaseConn
            val table      = connection.getTable(TableName.valueOf(tableName))
            val list = partitionRdd.map(data => {
              NaHbaseDao.putNaTripData(data._1, data._2)
            }).toList
            if (null != list && list.nonEmpty) {
              NaHbaseDao.saveData(list, table)
            }
          })
        })
      }
    

    相关文章

      网友评论

          本文标题:广播变量

          本文链接:https://www.haomeiwen.com/subject/vklhkctx.html