美文网首页语言玩转大数据程序员
记录完整spark.hive.sql处理过程

记录完整spark.hive.sql处理过程

作者: 热血沸腾 | 来源:发表于2017-09-08 16:38 被阅读154次

    需求:

    从App启动日志中读取所需数据,用来得到用户的常用启动时间点,常活动的地理位置,并更新到用户画像表中。
    

    背景:

    device_id 设备id
    start_time App启动时间
    latitude 上传纬度
    longitude 上传精度
    

    实现

    原始数据

    DataFrame数据形式是结构化数据,如下
    
        device_id                                   start_time                  latitude                longitude
    
    0   ad3bb7b86f9ad9ff4786f767094f31f4835cf167    2017-07-26 00:18:55.985     39.69992988280924       116.5083292779666
    1   7e98a8401070c6bcb46a9ef32efd0e6a7066a220    2017-07-26 00:04:37.532     39.82559758010746       116.4472136767569
    2   865675021801835                             2017-07-26 00:19:28.821     0.000000                0.000000
    

    通过
    schemeRdd.rdd变成普通rdd

    结构变为
    [(device_id,start_time,latitude,longitude),(device_id,start_time,latitude,longitude),...]
    
    1. 经纬度处理成geohash6 合并到一起生成的RDD数据结构
    
        rdd.map(lambda x: (x['device_id'], x['start_time'],geohash.encode(x[lat],x[lng]))   
    
        得到数据结构是:
        [('device_id',start_time,geohash6)), (('device_id',start_time,geohash6))]
    
     
    2. 拆分开成2个rdd ,分别是时间和GeoHash的,分别处理
    
        timerdd = rdd.map(lambda x: (x[0],x[1]))
        timerdd_group=timerdd.groupByKey()
        得到数据结构
        [('device_id1',['2017-09-01 12:34:00','2017-06-30 01:01:12']),('device_id2',['2017-09-05 11:15:32','2017-07-30 07:01:08'])]
    
        geohashrdd=rdd.map(lambda x: (x[0],x[2]))
        geohashrdd_group=geohashrdd.groupByKey()
        得到数据结构
        [('device_id1',['EXXEQ6','OKWQE9','SDEDD1','SDQGW2']),('device_id2',['LLSJW0','YWQDX7'])]
    
        2.1 得到用0-24来表示的常使用的时间点 , 得到最近使用时间到日期  (注意时间list也许没数据,则设置默认时间)
            result_time_rdd= timerdd_group.mapValues(lambda x: use_time(x))     #该函数生成一个元祖 ('often_use_time','last_use_time')
    
            得到结构为
            [('device_id1',(12,'2017-09-01')),('device_id2',(9,'2017-06-05')),('device_id3',(0,'1970-01-01'))]
    
        2.2 得到最常去的2个geohash的值
            result_geohash_rdd = geohashrdd_group.mapValues(lambda x: activie_geohash(x) )
    
            得到结构为
            [('device_id1',('EWEQ1','SDWQ3'),('device_id2',('UYWIE8','']),('device_id3',['','']]
    
    
    1. 根据结果数据结构,组拼更新SQL,更新数据
        结果2个rdd
    
        result_time_rdd    =  [('device_id1',(12,'2017-09-01')),('device_id2',(9,'2017-06-05')),('device_id3',(0,'1970-01-01'))]
    
        result_geohash_rdd =  [('device_id1',('EWEQ1','SDWQ3'),('device_id2',('UYWIE8','']),('device_id3',['','']]
    
        通过cogroup作得到
    
        [('device_id1',([12,'2017-09-01'],['EWEQ1','SDWQ3']),('device_id2',['UYWIE8','POKE5']),('device_id3',['MVBD5','ZCXV4']]
    

    更新到表中

    
     UPDATE recommend.user_tag SET open_app_period=x[1],open_app_last=x[2],active_loc_1=x[3],active_loc_2=x[4] WHERE device_id=%s
    

    结束语

    全文使用RDD的API来实现,RDD的API十分强大
    目前建议使用DataFrame来实现

    相关文章

      网友评论

        本文标题:记录完整spark.hive.sql处理过程

        本文链接:https://www.haomeiwen.com/subject/mqxnjxtx.html