美文网首页
Maxcompute数据清洗和

Maxcompute数据清洗和

作者: zishen | 来源:发表于2019-07-10 15:05 被阅读0次

    上手dataworks

    关于maxcompute

    • maxcompute原名是odps(open-data-processing-server)就是阿里提供的一个大数据分布式计算服务系统,分布式系统用来提升企业处理海量数据能力,可能是新改的名字很多借口都用odps命名。阿里官方文档.
    • 架构如下 image

      集成了很多计算框架,也很方便接口开发。

    关于dataworks

    • dataworks就是基于maxcompute的PaaS平台,可以简单理解为dataworks是一个web形式的开发管理工具,而maxcompute就是一台云端的很强的大数据计算主机。你可以通过dataworks来用maxcompute,也可以通过maxcompute的sdk(下节会说)。
    • dataworks提供了可视化的开发流程,一个“业务流程”就相当于一个项目工程(业务流程和数据库表命名规则)
    • 业务流程里面的“数据集成”模块主要用来同步多端数据,“数据开发”模块可以可视化建立节点进行开发。可以通过dataworks在maxcompute中建表上传数据。具体操作入门可以看dataworks文档

    PyODPS连接maxcompute:

    • PyODPS是Maxcompute的Python版的SDK,它提供了对MaxCompute对象的基本操作,同时提供DataFrame框架,可以在MaxCompute上进行数据分析。同时兼容Python2.7和3.x,很方便。

    安装调试pyodps:

    • 如果是用的anaconda的环境的话直接pip install pyodps就能安装了。具体参考:pyodps安装指南

    • 在本地Python环境里面连接maxcompute的时候官方文档没说明白。安装好pyodps后导入包。

    from odps import ODPS
    #实例化maxcompute的对象
    o = ODPS(access_id='xxxxxx'
                        , access_secret='xxxxxx'
                        , project='工作空间名'
                        ,end_point='https://service.odps.aliyun.com/api')                        
    
    • accesskey主账号设置了发到邮箱,end_point取决于网络环境和maxcompute服务器地址,按照上面就可以,如果是外网或者vpc参考:end_point设置。连上后就可以像在dataworks上的pyodps节点一样在本地对远端maxcompute进行操作了。

    jupyter-notebook利用pyodps和pyecharts统计可视化简单例子:

    • pyechart我之前做股票分析的时候发现的是百度开发的js包echarts的py版本,自从用了他,再也没用过matplotlib,图标都是可交互的还能自定义主题也可以集成到Django、flask里面很方便。PyEcharts文档
    from odps import ODPS
    from odps.df import DataFrame
    
    o = ODPS(access_id='  ', access_secret='  ', project='  ', end_point='https://service.odps.aliyun.com/api')
    
    t = o.get_table('data_product')
    df = DataFrame(t)
    deepdraw = df[df['source']=='_deepdraw']
    leycloud = df[df['source']=='_leycloud']
    source_agg = df.groupby(df.source).agg(count=df.count())
    print(source_agg)
    
    from pyecharts.charts import Pie
    from pyecharts import options as opts
    source_pie =(Pie()
                 .add("产品数",[('_leycloud',15588349),('_deepdraw',1035428)])
                 .set_global_opts(title_opts=opts.TitleOpts(title="source分布"))
                 )
    source_pie.render_notebook()
    
    pyecharts可视化效果

    Jupyter的交互增强和日志服务

    对Maxcompute中的表进行操作

    建表

    • 在datawork里面就可以建表,在这里直接建表,也可以在业务流程中建表。建的表都是存在Maxcompute里面
      工作空间建表
      业务流程建表
      在DDL模式中用SQL建表,- MaxcomputeSQL概述,- SQL规范 dataworks创建表
    • 也可以通过PyODPS创建和操作表pyodps基本操作

    maxcompute表的设计

    数据中台表

    各字段对应的英文名称

    字段表格图

    示例tops表代码

    CREATE TABLE `wzs_tops` (
        `uuid` string COMMENT '全局唯一uuid,unique key,有索引',
        `key_word` string COMMENT '分类的关键词',
        `title` string COMMENT '标题',
        `images` string COMMENT '图片url,json字段',
        `price` bigint COMMENT '价格',
        `comments` bigint COMMENT '全局唯一uuid,unique key,有索引',
        `brand` string COMMENT '品牌品',
        `create_date` datetime,
        `source_pictures` string COMMENT '所属图片的source id,有索引',
        `product_name` string COMMENT '商品名称',
        `style` string COMMENT '风格',
        `craft` string COMMENT '工艺',
        `color_pattern` string COMMENT '色彩花纹',
        `main_fabric` string COMMENT '主面料',
        `source` string COMMENT '信息来源',
    
        `model` string COMMENT '版型',
        `profile` string COMMENT '廓形',
        `coat_length` string COMMENT '衣长',
        `collar_design` string COMMENT '领型',
        `sleeve_length` string COMMENT '袖长',
        `sleeve_design` string COMMENT '袖型',
        `placket_design` string COMMENT '门襟类型',
        `hem_design` string COMMENT '下摆设计'
        
    ) ;
    

    对ODPS的DataFrame使用自定义函数

    • apply(axis = 0,reduce = False)调用自定义函数

    axis = 0的时候为对每一行调用自定义函数,默认直接传入collection的一行,函数处理返回后再传入下一行。reduce = False时返回的是sequence,否则返回的是collection,reduce为False时,也可以使用yield关键字来返回多行结果。

    • 每一次对有odps表生成的dataframe进行聚合、数据变换或自定义函数等操作时都会在dataworks中生成一张对应变换dataframe的临时表(生命周期为1天),可以在表管理中找到也可以到数据地图(Meta)中对表进行操作,如修改生命周期,可以将运行生成的结果保存方便调用。


      生成的临时表
    • 也可以在运行出结果后用.persist方法将返回的新dataframe保存为odps表。
    • 导入包尽量在函数内导入,自定义函数中调用其他函数要写成闭包的形式
      用@output指定自定义函数返回的字段名称和类型
    @output(['uuid','title','brand','create_date','source','source_picture','key_word',
             'product_name','images','price','comments','style', 'craft', 'color_pattern', 
             'main_fabric', 'model', 'profile','coat_length','collar_design', 'sleeve_length',
             'sleeve_design','placket_design', 'hem_design']
            ,['string','string','string','datetime','string','string','string','string','string'
              ,'string','string','string','string','string','string','string','string','string'
              ,'string','string','string','string','string'])
    def df_clean(row):
        import json
        import pandas as pd
        import sys
        reload(sys) 
        sys.setdefaultencoding('utf-8')
    
    • map_reduce调用自定义函数

    df.map_reduce(mapper = df_clean)#等价于
    df.apply(df_clean,axis = 0, reduce =False)
    

    在odps上使用第三方包:

    odps上只有numpy一个第三方包,如果想用pandas或其他包就得上传包和依赖包到odps资源。可以通过jupyter上pyodps的接口上传,但是在上传数据较大的包如pandas有20mb会出现timeout报错。解决办法是在dataworks上“业务流程”——“资源”——“新建Archive资源”中上传,上传时打钩“上传为ODPS资源”就可以在当前工作空间中使用资源,也可以将小型的包的源码上传为新建Python资源中就可以在odps中进行引用。也可以将需要调用的文件上传到File资源中进行调用。文档和包

    • 包要找适配Linux和Python2.7的


      上传csv文件
    • 注意每一个第三方包要改成.zip或.tar.gz的后缀


      包后缀

    在代码中调用第三方包和文件:

    from odps import options
    
    options.sql.settings = { 'odps.isolation.session.enable': True }
    options.df.libraries = ['pandas.zip','pytz.zip','dateutil.zip','six.tar.gz'] #导入资源库中的pandas包和依赖
    resource = o.get_resource('category.csv')
    with resource.open('r') as fp:
        category = pd.read_csv(fp)     #在dataworks中运行要先通过resource打开文件,如果本地运行直接打开
    

    相关文章

      网友评论

          本文标题:Maxcompute数据清洗和

          本文链接:https://www.haomeiwen.com/subject/eqxaoqtx.html