实时流同步hive
同步实时流数据时,首先要看数据类型,如果是append流,则比较简单,
如果数据流存在根据id更新,删除的情况,则同步到es,hbase等分布书数据库比较简单
同步append流
思路
每天写一个分区,每小时消费一次,使用hdfs的append模式往同一个目录写,这样可能出现每天分区中的小文件较多的情况,可以使用hive查询进行文件merge。
以kafka+sparkstreaming+hive为例
- sparkstreaming消费kafka数据写hdfs
override def handle(ssc: StreamingContext): Unit = {
val conf = ssc.sparkContext.getConf
val source = new KafkaDirectSource[String, String](ssc)
val lines: DStream[ActualTraceData] = source.getDStream(_.value()).map(RealTimeData.parse[ActualTraceData]).map(_.body)
val spark = SparkSession.builder.config(ssc.sparkContext.getConf).getOrCreate()
import spark.implicits._
lines.foreachRDD(rdd => {
val df: DataFrame = rdd.toDF()
.withColumn("date", lit(DateTimeUtil.currentDateStr()))
df.write.mode(SaveMode.Append)
.option("path", conf.get("spark.trace.hive.output"))
.partitionBy("date", "datatype")
.option("delimiter", "\t")
.format("csv").save()
})
}
- 创建hive外部表
CREATE EXTERNAL TABLE `trace.online`(
`id` string COMMENT 'id',
`data` string COMMENT '数据',
`key1` string COMMENT '索引1',
`key2` string COMMENT '索引2',
`key3` string COMMENT '索引3',
`key4` string COMMENT '索引4',
`key5` string COMMENT '索引5',
`ts` bigint COMMENT '时间戳'
) PARTITIONED BY (
`date` string COMMENT '日期',
`datatype` string COMMENT '数据分类')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
location 'hdfs:///hive/trace';
- 每天刷hive分区,这里有两种办法,执行ddl或是hive 自动检测修复分区
执行ddl添加分区,ddl如下
alter table `trace.online` add PARTITION (date='20200306', datatype='trace_test_input')
location 'hdfs:///hive/trace/date=20200306/datatype=trace_test_input';
执行MSCK REPAIR TABLE trace.online;
同步普通数据流
思路
由于hive底层是hdfs,没有按主键更新记录的功能。假设数据按天的频率同步,则数据可按天分区,每个分区都是数据的一个快照,消费数据时,将拿到的数据和前一个分区的数据进行join和过滤,再将其存到当天的分区中,此分区即是数据流的最新快照了。
网友评论