美文网首页
Maxcompute数据清洗和

Maxcompute数据清洗和

作者: zishen | 来源:发表于2019-07-10 15:05 被阅读0次

上手dataworks

关于maxcompute

  • maxcompute原名是odps(open-data-processing-server)就是阿里提供的一个大数据分布式计算服务系统,分布式系统用来提升企业处理海量数据能力,可能是新改的名字很多借口都用odps命名。阿里官方文档.
  • 架构如下 image

    集成了很多计算框架,也很方便接口开发。

关于dataworks

  • dataworks就是基于maxcompute的PaaS平台,可以简单理解为dataworks是一个web形式的开发管理工具,而maxcompute就是一台云端的很强的大数据计算主机。你可以通过dataworks来用maxcompute,也可以通过maxcompute的sdk(下节会说)。
  • dataworks提供了可视化的开发流程,一个“业务流程”就相当于一个项目工程(业务流程和数据库表命名规则)
  • 业务流程里面的“数据集成”模块主要用来同步多端数据,“数据开发”模块可以可视化建立节点进行开发。可以通过dataworks在maxcompute中建表上传数据。具体操作入门可以看dataworks文档

PyODPS连接maxcompute:

  • PyODPS是Maxcompute的Python版的SDK,它提供了对MaxCompute对象的基本操作,同时提供DataFrame框架,可以在MaxCompute上进行数据分析。同时兼容Python2.7和3.x,很方便。

安装调试pyodps:

  • 如果是用的anaconda的环境的话直接pip install pyodps就能安装了。具体参考:pyodps安装指南

  • 在本地Python环境里面连接maxcompute的时候官方文档没说明白。安装好pyodps后导入包。

from odps import ODPS
#实例化maxcompute的对象
o = ODPS(access_id='xxxxxx'
                    , access_secret='xxxxxx'
                    , project='工作空间名'
                    ,end_point='https://service.odps.aliyun.com/api')                        
  • accesskey主账号设置了发到邮箱,end_point取决于网络环境和maxcompute服务器地址,按照上面就可以,如果是外网或者vpc参考:end_point设置。连上后就可以像在dataworks上的pyodps节点一样在本地对远端maxcompute进行操作了。

jupyter-notebook利用pyodps和pyecharts统计可视化简单例子:

  • pyechart我之前做股票分析的时候发现的是百度开发的js包echarts的py版本,自从用了他,再也没用过matplotlib,图标都是可交互的还能自定义主题也可以集成到Django、flask里面很方便。PyEcharts文档
from odps import ODPS
from odps.df import DataFrame

o = ODPS(access_id='  ', access_secret='  ', project='  ', end_point='https://service.odps.aliyun.com/api')

t = o.get_table('data_product')
df = DataFrame(t)
deepdraw = df[df['source']=='_deepdraw']
leycloud = df[df['source']=='_leycloud']
source_agg = df.groupby(df.source).agg(count=df.count())
print(source_agg)

from pyecharts.charts import Pie
from pyecharts import options as opts
source_pie =(Pie()
             .add("产品数",[('_leycloud',15588349),('_deepdraw',1035428)])
             .set_global_opts(title_opts=opts.TitleOpts(title="source分布"))
             )
source_pie.render_notebook()
pyecharts可视化效果

Jupyter的交互增强和日志服务

对Maxcompute中的表进行操作

建表

  • 在datawork里面就可以建表,在这里直接建表,也可以在业务流程中建表。建的表都是存在Maxcompute里面
    工作空间建表
    业务流程建表
    在DDL模式中用SQL建表,- MaxcomputeSQL概述,- SQL规范 dataworks创建表
  • 也可以通过PyODPS创建和操作表pyodps基本操作

maxcompute表的设计

数据中台表

各字段对应的英文名称

字段表格图

示例tops表代码

CREATE TABLE `wzs_tops` (
    `uuid` string COMMENT '全局唯一uuid,unique key,有索引',
    `key_word` string COMMENT '分类的关键词',
    `title` string COMMENT '标题',
    `images` string COMMENT '图片url,json字段',
    `price` bigint COMMENT '价格',
    `comments` bigint COMMENT '全局唯一uuid,unique key,有索引',
    `brand` string COMMENT '品牌品',
    `create_date` datetime,
    `source_pictures` string COMMENT '所属图片的source id,有索引',
    `product_name` string COMMENT '商品名称',
    `style` string COMMENT '风格',
    `craft` string COMMENT '工艺',
    `color_pattern` string COMMENT '色彩花纹',
    `main_fabric` string COMMENT '主面料',
    `source` string COMMENT '信息来源',

    `model` string COMMENT '版型',
    `profile` string COMMENT '廓形',
    `coat_length` string COMMENT '衣长',
    `collar_design` string COMMENT '领型',
    `sleeve_length` string COMMENT '袖长',
    `sleeve_design` string COMMENT '袖型',
    `placket_design` string COMMENT '门襟类型',
    `hem_design` string COMMENT '下摆设计'
    
) ;

对ODPS的DataFrame使用自定义函数

  • apply(axis = 0,reduce = False)调用自定义函数

axis = 0的时候为对每一行调用自定义函数,默认直接传入collection的一行,函数处理返回后再传入下一行。reduce = False时返回的是sequence,否则返回的是collection,reduce为False时,也可以使用yield关键字来返回多行结果。

  • 每一次对有odps表生成的dataframe进行聚合、数据变换或自定义函数等操作时都会在dataworks中生成一张对应变换dataframe的临时表(生命周期为1天),可以在表管理中找到也可以到数据地图(Meta)中对表进行操作,如修改生命周期,可以将运行生成的结果保存方便调用。


    生成的临时表
  • 也可以在运行出结果后用.persist方法将返回的新dataframe保存为odps表。
  • 导入包尽量在函数内导入,自定义函数中调用其他函数要写成闭包的形式
    用@output指定自定义函数返回的字段名称和类型
@output(['uuid','title','brand','create_date','source','source_picture','key_word',
         'product_name','images','price','comments','style', 'craft', 'color_pattern', 
         'main_fabric', 'model', 'profile','coat_length','collar_design', 'sleeve_length',
         'sleeve_design','placket_design', 'hem_design']
        ,['string','string','string','datetime','string','string','string','string','string'
          ,'string','string','string','string','string','string','string','string','string'
          ,'string','string','string','string','string'])
def df_clean(row):
    import json
    import pandas as pd
    import sys
    reload(sys) 
    sys.setdefaultencoding('utf-8')
  • map_reduce调用自定义函数

df.map_reduce(mapper = df_clean)#等价于
df.apply(df_clean,axis = 0, reduce =False)

在odps上使用第三方包:

odps上只有numpy一个第三方包,如果想用pandas或其他包就得上传包和依赖包到odps资源。可以通过jupyter上pyodps的接口上传,但是在上传数据较大的包如pandas有20mb会出现timeout报错。解决办法是在dataworks上“业务流程”——“资源”——“新建Archive资源”中上传,上传时打钩“上传为ODPS资源”就可以在当前工作空间中使用资源,也可以将小型的包的源码上传为新建Python资源中就可以在odps中进行引用。也可以将需要调用的文件上传到File资源中进行调用。文档和包

  • 包要找适配Linux和Python2.7的


    上传csv文件
  • 注意每一个第三方包要改成.zip或.tar.gz的后缀


    包后缀

在代码中调用第三方包和文件:

from odps import options

options.sql.settings = { 'odps.isolation.session.enable': True }
options.df.libraries = ['pandas.zip','pytz.zip','dateutil.zip','six.tar.gz'] #导入资源库中的pandas包和依赖
resource = o.get_resource('category.csv')
with resource.open('r') as fp:
    category = pd.read_csv(fp)     #在dataworks中运行要先通过resource打开文件,如果本地运行直接打开

相关文章

网友评论

      本文标题:Maxcompute数据清洗和

      本文链接:https://www.haomeiwen.com/subject/eqxaoqtx.html