美文网首页
浅谈实时流同步Hive方案

浅谈实时流同步Hive方案

作者: 少年_7b60 | 来源:发表于2020-03-07 21:33 被阅读0次

    实时流同步hive

    同步实时流数据时,首先要看数据类型,如果是append流,则比较简单,
    如果数据流存在根据id更新,删除的情况,则同步到es,hbase等分布书数据库比较简单

    同步append流

    思路

    每天写一个分区,每小时消费一次,使用hdfs的append模式往同一个目录写,这样可能出现每天分区中的小文件较多的情况,可以使用hive查询进行文件merge。

    以kafka+sparkstreaming+hive为例

    1. sparkstreaming消费kafka数据写hdfs
    override def handle(ssc: StreamingContext): Unit = {
        val conf = ssc.sparkContext.getConf
    
        val source = new KafkaDirectSource[String, String](ssc)
        val lines: DStream[ActualTraceData] = source.getDStream(_.value()).map(RealTimeData.parse[ActualTraceData]).map(_.body)
    
        val spark = SparkSession.builder.config(ssc.sparkContext.getConf).getOrCreate()
        import spark.implicits._
    
        lines.foreachRDD(rdd => {
        val df: DataFrame = rdd.toDF()
            .withColumn("date", lit(DateTimeUtil.currentDateStr()))
        df.write.mode(SaveMode.Append)
            .option("path", conf.get("spark.trace.hive.output"))
            .partitionBy("date", "datatype")
            .option("delimiter", "\t")
            .format("csv").save()
        })
    }
    
    1. 创建hive外部表
    CREATE EXTERNAL TABLE `trace.online`(
    `id` string COMMENT 'id',
    `data` string COMMENT '数据',
    `key1` string COMMENT '索引1',
    `key2` string COMMENT '索引2',
    `key3` string COMMENT '索引3',
    `key4` string COMMENT '索引4',
    `key5` string COMMENT '索引5',
    `ts` bigint COMMENT '时间戳'
    ) PARTITIONED BY (
    `date` string COMMENT '日期',
    `datatype` string COMMENT '数据分类')
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t'
    STORED AS TEXTFILE
    location 'hdfs:///hive/trace';
    
    1. 每天刷hive分区,这里有两种办法,执行ddl或是hive 自动检测修复分区

    执行ddl添加分区,ddl如下

    alter table `trace.online` add PARTITION (date='20200306', datatype='trace_test_input') 
    location 'hdfs:///hive/trace/date=20200306/datatype=trace_test_input';
    

    执行MSCK REPAIR TABLE trace.online;

    同步普通数据流

    思路

    由于hive底层是hdfs,没有按主键更新记录的功能。假设数据按天的频率同步,则数据可按天分区,每个分区都是数据的一个快照,消费数据时,将拿到的数据和前一个分区的数据进行join和过滤,再将其存到当天的分区中,此分区即是数据流的最新快照了。

    相关文章

      网友评论

          本文标题:浅谈实时流同步Hive方案

          本文链接:https://www.haomeiwen.com/subject/qykkdhtx.html