美文网首页
阿里云-流计算典型示例场景

阿里云-流计算典型示例场景

作者: fat32jin | 来源:发表于2020-03-12 06:56 被阅读0次

    示例原地址:https://help.aliyun.com/document_detail/65839.html?spm=a2c4g.11186623.6.788.619a7fd2VLJkh4

    一、电商实时PVUV

    - 源表结构

    表 1. 日志源表

    字段名 数据类型 详情
    account_id VARCHAR 用户ID
    client_ip VARCHAR 客户端IP
    client_info VACHAR 设备机型信息
    platform VARHCAR 系统版本信息
    imei VARCHAR 设备唯一标识
    version BIGINT 版本号
    action BIGINT 页面跳转描述
    gpm VARCHAR 埋点链路
    c_time VARCHAR 请求时间
    target_type VARCHAR 目标类型
    target_id VARCHAR 目标ID
    udata VARCHAR 扩展信息
    session_id VARHCAR 会话ID
    product_id_chain VARHCAR 商品ID串
    cart_product_id_chain VARCHAR 加购商品ID
    tag VARCHAR 特殊标记
    position VARCHAR 位置信息
    network VARCHAR 网络使用情况
    p_dt VARCHAR 时间分区
    p_platform VARCHAR 系统版本信息

    表 2. 数据库RDS结果表

    字段名 数据类型 详情
    summary_date BIGINT 统计日期
    summary_min VARCHAR 统计分钟
    pv BIGINT 单击量
    uv BIGINT 访客量 说明: 一天内同个访客多次访问仅计算一个UV。
    currenttime TIMESTAMP 当前时间
    • 建流表
    --数据的订单源表
    
    CREATE TABLE source_ods_fact_log_track_action (
        account_id                        VARCHAR,--用户ID
        client_ip                         VARCHAR,--客户端IP
        client_info                       VARCHAR,--设备机型信息
        platform                          VARCHAR,--系统版本信息
        imei                              VARCHAR,--设备唯一标识
        `version`                         VARCHAR,--版本号
        `action`                          VARCHAR,--页面跳转描述
        gpm                               VARCHAR,--埋点链路
        c_time                            VARCHAR,--请求时间
        target_type                       VARCHAR,--目标类型
        target_id                         VARCHAR,--目标ID
        udata                             VARCHAR,--扩展信息,JSON格式
        session_id                        VARCHAR,--会话ID
        product_id_chain                  VARCHAR,--商品ID串
        cart_product_id_chain             VARCHAR,--加购商品ID
        tag                               VARCHAR,--特殊标记
        `position`                        VARCHAR,--位置信息
        network                           VARCHAR,--网络使用情况
        p_dt                              VARCHAR,--时间分区天
        p_platform                        VARCHAR--系统版本信息
    
    
    ) WITH (
        type='datahub',
          endPoint='yourEndpointURL',
        project='yourProjectName',
        topic='yourTopicName',
        accessId='yourAccessId',
        accessKey='yourAccessSecret',
        batchReadSize='1000'
    );
    
    CREATE TABLE result_cps_total_summary_pvuv_min (
        summary_date              bigint,--统计日期
        summary_min               varchar,--统计分钟
        pv                        bigint,--单击量
        uv                        bigint,--一天内同个访客多次访问仅计算一个UV
        currenttime               timestamp,--当前时间
        primary key (summary_date,summary_min)
    ) WITH (
        type= 'rds',
        url = 'yourRDSDatabaseURL',
        userName = 'yourDatabaseUserName',
        password = 'yourDatabasePassword',
        tableName = 'yourTableName'
    );
    

    难点解析

    为了方便理解结构化代码和代码维护,我们推荐使用View(数据视图概念)把业务逻辑差分成二个模块。

    • 模块一

      CREATE VIEW result_cps_total_summary_pvuv_min_01 AS
      select 
      cast(p_dt as bigint) as summary_date, --时间分区
      count(client_ip) as pv, --客户端的IP
      count(distinct client_ip) as uv,--客户端去重
      cast(max(c_time) as TIMESTAMP) as c_time--请求的时间
      from source_ods_fact_log_track_action
      group by p_dt;                                
      
      • 查出客户的IP访问网站的单击次数作为PV。对客户的IP去重作为UV。
      • cast(max(c_time) as TIMESTAMP),记录最后请求的时间。
      • 这段业务逻辑根据p_dt(时间分区,以为单位)的时间来做分组,以max(c_time)表示一段时间的最后访问时间为截止,向数据库插入一条PV/UV的数量。结果如下表。
    p_dt pv uv max(c_time)
    2017-12-12 1000 100 2017-12-12 9:00:00
    2017-12-12 1500 120 2017-12-12 9:01:00
    2017-12-12 2200 200 2017-12-12 9:02:00
    2017-12-12 3300 320 2017-12-12 9:03:00

    模块二

    INSERT into  result_cps_total_summary_pvuv_min
    select 
    a.summary_date,--时间分区,天为单位
    cast(DATE_FORMAT(c_time,'HH:mm')  as varchar) as summary_min,
    --取出小时分钟级别的时间
    a.pv,
    a.uv,
    CURRENT_TIMESTAMP  as currenttime--当前时间
    from result_cps_total_summary_pvuv_min_01 AS a                          
    

    把模块一的数据精确到小时分钟级别取出,最后根据数据得出PV、UV的增长曲线。如图所示。

    image.png

    二、电商订单地域统计

    image.png

    SQL语句:

    订单

    CREATE TABLE source_order (
        id VARCHAR,-- 订单ID
        seller_id VARCHAR, --卖家ID
        account_id VARCHAR,--买家ID
        receive_address_id VARCHAR,--收货地址ID
        total_price VARCHAR,--订单金额
        pay_time VARCHAR --订单支付时间
    ) WITH (
        type='datahub',
        endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
        project='yourProjectName',--您的project
        topic='yourTopicName',--您的topic
        roleArn='yourRoleArn',--您的roleArn
        batchReadSize='500'
    );
    订单地址
    CREATE TABLE source_order_receive_address ( 
         id VARCHAR,--收货地址ID 
         full_name VARCHAR,--收货人全名 
         mobile_number VARCHAR,--收货人手机号 
         detail_address VARCHAR,--收货详细地址 
         province VARCHAR,--收货省份 
         city_id VARCHAR,--收货城市 
         create_time VARCHAR --创建时间 
     ) WITH ( 
         type='datahub', 
         endPoint='http://dh-cn-hangzhou.aliyun-inc.com', 
         project='yourProjectName',--您的project 
         topic='yourTopicName',--您的topic 
         roleArn='yourRoleArn',--您的roleArn 
         batchReadSize='500' 
     );
                                    
    城市表
    CREATE TABLE dim_city ( 
         city_id varchar, 
         city_name varchar,--城市名 
         province_id varchar,--所属省份ID 
         zip_code varchar,--邮编 
         lng varchar,--经度 
         lat varchar,--纬度 
      PRIMARY KEY (city_id), 
      PERIOD FOR SYSTEM_TIME --定义为维表 
     ) WITH ( 
         type= 'rds', 
         url = 'yourDatabaseURL',--您的数据库url 
         tableName = 'yourTableName',--您的表名 
         userName = 'yourDatabaseName',--您的用户名 
         password = 'yourDatabasePassword'--您的密码 
     );
    按日统计不同地域订单(总销售额)的分布。
    CREATE TABLE result_order_city_distribution ( 
         summary_date varchar,--统计日期 
         city_id bigint,--城市ID 
         city_name varchar,--城市名 
         province_id bigint,--所属省份ID 
         gmv double,--总销售额 
         lng varchar,--经度 
         lat varchar,--纬度 
         primary key (summary_date,city_id) 
        ) WITH ( 
            type= 'rds', 
            url = 'yourDatabaseURL',--您的数据库url 
            tableName = 'yourTableName',--您的表名 
            userName = 'yourDatabaseName',--您的用户名 
            password = 'yourDatabasePassword'--您的密码 
        );
    编辑业务逻辑
     insert into result_order_city_distribution 
     select 
     d.summary_date 
     ,cast(d.city_id as BIGINT) 
     ,e.city_name 
     ,cast(e.province_id as BIGINT) 
     ,d.gmv 
     ,e.lng 
     ,e.lat 
     from 
     ( 
             select 
             DISTINCT 
             DATE_FORMAT(a.pay_time,'yyyyMMdd') as summary_date 
             ,b.city_id as city_id 
             ,round(sum(cast(a.total_price as double)),2) as gmv 
             from source_order as a 
             join source_order_receive_address as b on a.receive_address_id =b.id 
             group by DATE_FORMAT(a.pay_time,'yyyyMMdd'),b.city_id 
             --双流join,并根据日期和城市ID得到销售额分布 
     )d join dim_city FOR SYSTEM_TIME AS OF PROCTIME() as e on d.city_id = e.city_id 
     -- join维表,补齐城市地理信息,得到最终结果 
    

    三、工业互联网传感器统计

    业务描述
    该工业客户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上有10个不同种类传感器,这些传感器,大概每5秒采集并上传一份数据到日志服务(Log/SLS),每个采集点格式如下。

    s_id s_value s_ts
    传感器ID 传感器当前值 发送时间
    同时,上述传感器分布在多个设备、多个厂区,用户在RDS还记录如下传感器、设备、厂区的分布维表,如下:

    s_id s_type device_id factory_id
    传感器ID 传感器监控类型 设备ID 厂区ID

    上述信息存放在RDS上,用户希望传感器上传的数据能够和上述数据关联,并将传感器数据按照设备归类每1分钟打平为一张宽表,如下:

    ts device_id factory_id device_temp device_pres
    时间 设备ID 工厂ID 设备温度 设备压力

    为了简化不必要的逻辑,我们假定仅有两种类型的监控传感器,即温度和压力,以方便后续的计算,后续计算逻辑如下:

    筛选指定温度大于80的设备,并向下游触发告警。用户选择使用MQ作为消息触发源,也就是实时计算将温度大于80的设备过滤并投递给MQ,触发下游的用户定义的告警系统。

    将数据写出到在线OLAP系统中,这里用户选择了阿里云hybriddb for mysql (原petadata)。下游用户开发一整套BI系统对接PetaData进行多维度展示。

    经过传感器上传的数据进入Log,当行数据格式如下:

    {
    "sid": "t_xxsfdsad",
    "s_value": "85.5",
    "s_ts": "1515228763"
    }
    定义Log源表为s_sensor_data,结构如下:

    CREATE TABLE s_sensor_data (
        s_id    VARCHAR,
        s_value VARCHAR,
        s_ts    VARCHAR,
        ts      AS CAST(FROM_UNIXTIME(CAST(s_ts AS BIGINT)) AS TIMESTAMP),
        WATERMARK FOR ts AS withOffset(ts, 10000)
    ) WITH (
        TYPE='sls',
        endPoint ='http://cn-hangzhou-corp.sls.aliyuncs.com',
        accessId ='xxxxxxxxxxx',
        accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        project ='ali-cloud-streamtest',
        logStore ='stream-test',
    );
    
    定义传感器和设备关联RDS维表为d_sensor_device_data,结构如下:
    ``` sql
    CREATE TABLE d_sensor_device_data (
        s_id    VARCHAR,
        s_type  VARCHAR,
        device_id BIGINT,
        factory_id BIGINT,
        PERIOD FOR SYSTEM_TIME,
        PRIMARY KEY(s_id)
    ) WITH (
        TYPE='RDS',
        url='',
        tableName='test4',
        userName='test',
        password='XXXXXX'
    );
    

    定义触发告警逻辑MQ表为r_monitor_data, 结构如下:

    CREATE TABLE r_monitor_data (
        ts  VARCHAR,
        device_id   BIGINT,
        factory_id  BIGINT,
        device_TEMP DOUBLE,
        device_PRES DOUBLE
    ) WITH (
        TYPE='MQ'
    );
    

    定义存储结果数据的HybridDB表定义为r_device_data,结构如下:

    CREATE TABLE r_device_data (
        ts  VARCHAR,
        device_id BIGINT,
        factory_id BIGINT,
        device_temp DOUBLE,
        device_pres DOUBLE,
        PRIMARY KEY(ts, device_id)
    ) WITH (
        TYPE='HybridDB'
    );
    

    先考虑将传感器数据按分钟级别进行汇总,打平为一个宽表。为了更加结构化代码方便后续代码维护,使用View:

    --先获取每个传感器对应的设备、厂区
    CREATE VIEW v_sensor_device_data
    AS
    SELECT
        s.ts,
        s.s_id,
        s.s_value,
        d.s_type,
        d.device_id,
        d.factory_id
    FROM
        s_sensor_data s
    JOIN
        d_sensor_device_data FOR SYSTEM_TIME AS OF PROCTIME() as d
    ON
        s.s_id = d.s_id;
    
    --打平为一张宽表。
    CREATE VIEW v_device_data
    AS
    SELECT
        --使用滚窗的起始时间作为该条记录的时间
        CAST(TUMBLE_START(v.ts, INTERVAL '1' MINUTE) AS VARCHAR) as ts,
        v.device_id,
        v.factory_id,
        CAST(SUM(IF(v.s_type = 'TEMP', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'TEMP', 1, 0)) AS DOUBLE) device_temp, --这里用于计算这一分钟的温度平均值
        CAST(SUM(IF(v.s_type = 'PRES', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'PRES', 1, 0)) AS DOUBLE) device_pres --这里用于计算这一分钟的压力平均值
    FROM
        v_sensor_device_data v
    GROUP BY
        TUMBLE(v.ts, INTERVAL '1' MINUTE), v.device_id, v.factory_id;
    上述是核心计算逻辑,将这一分钟内分别统计关于温度和压力的平均值,作为这一分钟的温度值、压力值。由于使用的是Tumbling Window,也就意味着数据将在每分钟结束时候产出一份。接下来就将数据过滤写出到MQ和HybridDB,如下:
    
    --过滤温度大于80摄氏度的传感器,并写出到MQ触发告警。
    INSERT INTO r_monitor_data
    SELECT
        ts,
        device_id,
        factory_id,
        device_temp,
        device_pres
    FROM
        v_device_data
    WHERE
        device_temp > 80.0;
        
    --将数据写出到HybridDB,用于后续的分析。
    INSERT INTO r_device_data
    SELECT
        ts,
        device_id,
        factory_id,
        device_temp,
        device_pres
    FROM
        v_device_data;  
    

    相关文章

      网友评论

          本文标题:阿里云-流计算典型示例场景

          本文链接:https://www.haomeiwen.com/subject/aefcjhtx.html