美文网首页大数据
【案例-hadoop】hadoop和mysql实时流处理

【案例-hadoop】hadoop和mysql实时流处理

作者: X_Ran_0a11 | 来源:发表于2019-07-25 01:20 被阅读0次

    一、hadoop和mysql的配合使用

    几种hadoop组件的用法:

    hadoop的hdfs:分布式存储;
    hive:关系型数据库仓库;
    sqoop:hive和mysql和hdfs之间传递的简化工具;
    hbase:非关系型数据库;
    flume:日志/文件收集和传输器;
    kafka:数据传输器;
    spark:spark是类mr框架,但是Spark的中间数据放到内存中,对于迭代运算效率更高,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素);
    spark-streaming:基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。
    其他调度、协调工具等:oozie、zookeeper。。。

    1、mysql+hive+sqoop,离线处理

    hadoop本来是设计用于处理大量离线数据的,所以数据量小时,hive操作会比mysql更慢(mysql有索引机制,hive暴力mr)。所以:

    • 量小时考虑直接采用mysql进行处理;
    • 量大时将mysql数据导入hive进行处理;
    • 对hdfs的大量数据导入hive进行离线处理,处理完后的关键信息保存进mysql(这可能是海量数据离线处理最典型的场景了);
    2、flume+kafka+spark-streaming+hbase/hive,实时流处理

    flume和kafka两者其实很像,都是实现数据量传输的,但是flume适用多种接口,可以采集各种日志信息,kafka像一个广义意义的数据库,里面可以存放一定时间的数据,两者结合起来可以既适用多种接口,又有高容错信息。spark-steaming进行秒级处理时,可以处理实时数据流,从而把kafka保存的信息进行实时处理,最后保存进数据库。
    如果直接将flume信息写入HDFS时,由于线上数据生产快且具有突发性,可能会使得高峰时间hdfs数据写失败,所以一般数据先写到kafka,然后从kafka导入到hdfs上。
    当然也可以不用spark框架这一套,直接用mr进行操作,缺点就是处理更慢。

    二、实时数据的可视化展示

    1、直接将hive/hbase的数据提取进行可视化

    有组件可以直接对接hdfs实现,但是用于可视化展示的是关键数据(比如收集了海量淘宝用户的实时购买信息,关键数据是top10的热销产品),所以将数据导出至mysql,再进行可视化展示更实用。

    2、flume+kafka+spark-streaming+hbase/hive+mysql可视化

    spark流进行实时处理,然后应该使用前端接口提取hbase/hive中的数据,直接进行可视化展示。

    三、案例

    1、实时流处理

    采用的python定时任务+hive+sqoop这一套(因为没有装其他的组件,没研究到这么深啊)。
    原数据txt,包含两列,第一列为目录名,第二列为该目录名的搜索量:



    流程如下:

    • 数据上传至hdfs
    #启动hadoop
    cd /usr/local/Cellar/hadoop/3.1.2/sbin/
    ./start-all.sh
    
    #把创建好的淘宝目录csv放到hdfs的input文件夹
    
    cd $HADOOP_HOME 
    hadoop fs -mkdir /input   #创建input文件夹
    ranmo$ cd /Users/ranmo/Desktop/数据分析案例/hadoop淘宝目录实时可视化  #进入csv文件存放的位置
    open -e 淘宝目录.csv   #查看文本内容是否正确
    hadoop fs -put 淘宝目录.csv /input  #将文件上传至hdfs 
    hadoop fs -ls /input   #检查文本正确上传
    
    #python连接hdfs
    from pyhdfs import HdfsClient
    fs= HdfsClient(hosts='localhost:9870',user_name='ranmodeiMac')  #user_name必须要填,不然就采用$HADOOP_USER_NAME ,但是自己没配置的
    print(fs.listdir("/"))     #fs已经为hdfs端了,不用指定其他路径,该指令显示hdfs上的文件夹:['input', 'tmp', 'user']
    

    dir(fs)指令,或者官方文档查看相关的用法https://pyhdfs.readthedocs.io/en/latest/pyhdfs.html
    要更新列值,理论上是用python打开原txt,然后更新列值,但是由于是txt存储,不想mysql还能用update更新值,可能是需要hbase类的非关系型数据库来操作。这里直接考虑新建一个txt,生成199个数字(1~10000),在hive中导入两个文件,并不断更新列值。实际的py代码:

    #!/usr/bin/python 
    # -*- coding: utf-8 -*-
    
    from pyhdfs import HdfsClient
    import time
    import numpy as np
    
    fs= HdfsClient(hosts='localhost:9870',user_name='ranmodeiMac')  #user_name必须要填,不然就采用$HADOOP_USER_NAME ,但是自己没配置的
    
    
    def job():
        f=open('/Users/ranmo/Desktop/数据分析案例/hadoop淘宝目录实时可视化/value_realtime.csv','w+')
        for i in range(199):
            f.write(str(i))
            f.write(',')
            f.write(str(np.random.randint(1,10001)))
            f.write('\n')
        f.close()
    while True:
        job()
        fs.delete('/input/value_realtime.csv')
        fs.copy_from_local('/Users/ranmo/Desktop/数据分析案例/hadoop淘宝目录实时可视化/value_realtime.csv','/input/value_realtime.csv')
    #将文件上传至ndfs
        time.sleep(1)
        break
    
    
    • 执行脚本,每秒定时将数据导入hive,hive筛选点击量top10的数据导出至mysql
      首先是hive筛选:
    #执行hive
    hive
    
    #将input文件夹下,淘宝目录.txt和value_realtime.txt都导入到hive表中
    Drop table if exists taobao;
    create table taobao (id int,category string) row format delimited fields terminated by ',';
    load data inpath '/input/淘宝目录.csv' into table taobao;
    Drop table if exists taobao_value;
    create table taobao_value (id int,value int) row format delimited fields terminated by ',';
    load data inpath '/input/value_realtime.csv' into table taobao_value;
    
    #创建hive表,保存排行前十的种类,及其搜索量
    Drop table if exists top_value;
    create table top_value (category string,value int)row format delimited fields terminated by ',';
    Insert into table top_value (select category,value from taobao inner join taobao_value on taobao.id=taobao_value.id order by value desc limit 10);
    
    #退出
    quit;
    

    mysql中要存在对应的表:

    #打开mysql,创建top_value表
     /usr/local/mysql/bin/mysql -u root -p
    XXXXXX  #输入密码
    Use hive;
    Drop table if exists top_value;
    CREATE TABLE top_value (`category` VARCHAR(45) NULL,`value` INT NULL);
    quit
    

    最后传输至mysql

    #用sqoop把hive表传输至mysql
    sqoop export --connect jdbc:mysql://localhost:3306/hive?zeroDateTimeBehavior=EXCEPTION --username root --password xxxxxx --table top_value --export-dir /user/hive/warehouse/top_value
    

    mysql数据会一直更新。

    2、mysql数据可视化
    • 本来是想阿里云dataV连接本地数据库,可以实时刷新本地mysql数据,但是一直没有构建外部服务器成功,所以最后用tableau连接本地数据库,手动刷新数据- -。。
      状态1:


      image.png

      状态2:


      image.png

    三、脚本

    image.png
    image.png
    image.png
    image.png
    image.png

    最后还需要用定时器加一个反复执行的框架,我没加,直接手动执行,因为hadoopMR运行已经够慢了。。。

    五、总结

    • 中间是有一些问题的,比如说hdfs input文件夹里文件导入到hive后,会保存在user/hive//warehouse 里面,因此实际上每次只用对这个文件夹的数据进行更新就好了。但是我每次是反复导入到input文件夹,再把warehouse里面的表给删除,工作量上复杂了很多。
    • 在shell中登录mysql单独执行指令是没有问题,但是封装为sh执行的话,指令和mysql执行还有些区别,所以写脚本的时候还得改;
    • 这一套这么卡的原因可能一方面是只有sparking才适合执行实时流,还有一方面是因为我是增删数据,而不是更改数据,如果是更改数据的话速度应该会更快。

    最终结论就是mysql+hive+sqoop这一套只适合大量数据的离线处理。
    疑问?:是否可以用python操作hive数据,然后将同步更新mysql数据,这样会节省很多将hive数据导入到mysql的时间?

    相关文章

      网友评论

        本文标题:【案例-hadoop】hadoop和mysql实时流处理

        本文链接:https://www.haomeiwen.com/subject/zxdvlctx.html