从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。
class GacXs6Offline @Inject()( sparkConf : SparkConfiguration,
mysqlConf : MysqlConfiguration,
hbaseConf : HbaseConfiguration,
sparkContext : EnterpriseSparkContext[SparkContext],
source : NationDStream[(String,NaSourceData)],
naDriveTrip : NaDriveTrip
) extends Serializable {
val naDriveTripDS = naDriveTrip.exteact(source)
saveNaDriveTrip("drive_trip", naDriveTrip)
}
// serializable error, because the use of hbaseConf
def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
naDriveTrip.foreachRDD(rdd => {
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum", hbaseConf.hbaseUrl)
jobConf.set("zookeeper.znode.parent", "/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
jobConf.setOutputFormat(classOf[TableOutputFormat])
rdd.map(x => {
(new ImmutableBytesWritable, (new NaHbaseDao).putNaTripData(x._1, x._2))
}).saveAsHadoopDataset(jobConf)
})
}
// serializable ok, because we create a new hbaseConf
def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
naDriveTrip.foreachRDD(rdd => {
val conf = HBaseConfiguration.create()
val jobConf = new JobConf(conf)
val naHbaseConf = new HbaseConfiguration
jobConf.set("hbase.zookeeper.quorum", naHbaseConf.hbaseUrl)
jobConf.set("zookeeper.znode.parent", "/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
jobConf.setOutputFormat(classOf[TableOutputFormat])
rdd.map(x => {
(new ImmutableBytesWritable, (new NaHbaseDao).putNaTripData(x._1, x._2))
}).saveAsHadoopDataset(jobConf)
})
}
// serializable ok, because we create a new hbaseConf
def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
naDriveTrip.foreachRDD(rdd => {
rdd.foreachPartition(partitionRdd => {
val hbaseConf = new HbaseConfiguration
val hbase = new HbaseUtil(hbaseConf)
val connection = hbase.getHbaseConn
val table = connection.getTable(TableName.valueOf(tableName))
val list = partitionRdd.map(data => {
NaHbaseDao.putNaTripData(data._1, data._2)
}).toList
if (null != list && list.nonEmpty) {
NaHbaseDao.saveData(list, table)
}
})
})
}
网友评论