示例原地址:https://help.aliyun.com/document_detail/65839.html?spm=a2c4g.11186623.6.788.619a7fd2VLJkh4
一、电商实时PVUV
- 源表结构
表 1. 日志源表
字段名 | 数据类型 | 详情 |
---|---|---|
account_id | VARCHAR | 用户ID |
client_ip | VARCHAR | 客户端IP |
client_info | VACHAR | 设备机型信息 |
platform | VARHCAR | 系统版本信息 |
imei | VARCHAR | 设备唯一标识 |
version | BIGINT | 版本号 |
action | BIGINT | 页面跳转描述 |
gpm | VARCHAR | 埋点链路 |
c_time | VARCHAR | 请求时间 |
target_type | VARCHAR | 目标类型 |
target_id | VARCHAR | 目标ID |
udata | VARCHAR | 扩展信息 |
session_id | VARHCAR | 会话ID |
product_id_chain | VARHCAR | 商品ID串 |
cart_product_id_chain | VARCHAR | 加购商品ID |
tag | VARCHAR | 特殊标记 |
position | VARCHAR | 位置信息 |
network | VARCHAR | 网络使用情况 |
p_dt | VARCHAR | 时间分区 |
p_platform | VARCHAR | 系统版本信息 |
表 2. 数据库RDS结果表
字段名 | 数据类型 | 详情 |
---|---|---|
summary_date | BIGINT | 统计日期 |
summary_min | VARCHAR | 统计分钟 |
pv | BIGINT | 单击量 |
uv | BIGINT | 访客量 说明: 一天内同个访客多次访问仅计算一个UV。 |
currenttime | TIMESTAMP | 当前时间 |
- 建流表
--数据的订单源表
CREATE TABLE source_ods_fact_log_track_action (
account_id VARCHAR,--用户ID
client_ip VARCHAR,--客户端IP
client_info VARCHAR,--设备机型信息
platform VARCHAR,--系统版本信息
imei VARCHAR,--设备唯一标识
`version` VARCHAR,--版本号
`action` VARCHAR,--页面跳转描述
gpm VARCHAR,--埋点链路
c_time VARCHAR,--请求时间
target_type VARCHAR,--目标类型
target_id VARCHAR,--目标ID
udata VARCHAR,--扩展信息,JSON格式
session_id VARCHAR,--会话ID
product_id_chain VARCHAR,--商品ID串
cart_product_id_chain VARCHAR,--加购商品ID
tag VARCHAR,--特殊标记
`position` VARCHAR,--位置信息
network VARCHAR,--网络使用情况
p_dt VARCHAR,--时间分区天
p_platform VARCHAR--系统版本信息
) WITH (
type='datahub',
endPoint='yourEndpointURL',
project='yourProjectName',
topic='yourTopicName',
accessId='yourAccessId',
accessKey='yourAccessSecret',
batchReadSize='1000'
);
CREATE TABLE result_cps_total_summary_pvuv_min (
summary_date bigint,--统计日期
summary_min varchar,--统计分钟
pv bigint,--单击量
uv bigint,--一天内同个访客多次访问仅计算一个UV
currenttime timestamp,--当前时间
primary key (summary_date,summary_min)
) WITH (
type= 'rds',
url = 'yourRDSDatabaseURL',
userName = 'yourDatabaseUserName',
password = 'yourDatabasePassword',
tableName = 'yourTableName'
);
难点解析
为了方便理解结构化代码和代码维护,我们推荐使用View(数据视图概念)把业务逻辑差分成二个模块。
-
模块一
CREATE VIEW result_cps_total_summary_pvuv_min_01 AS select cast(p_dt as bigint) as summary_date, --时间分区 count(client_ip) as pv, --客户端的IP count(distinct client_ip) as uv,--客户端去重 cast(max(c_time) as TIMESTAMP) as c_time--请求的时间 from source_ods_fact_log_track_action group by p_dt;
- 查出客户的IP访问网站的单击次数作为PV。对客户的IP去重作为UV。
-
cast(max(c_time) as TIMESTAMP)
,记录最后请求的时间。 - 这段业务逻辑根据
p_dt
(时间分区,以天
为单位)的时间来做分组,以max(c_time)
表示一段时间的最后访问时间为截止,向数据库插入一条PV/UV的数量。结果如下表。
p_dt | pv | uv | max(c_time) |
---|---|---|---|
2017-12-12 | 1000 | 100 | 2017-12-12 9:00:00 |
2017-12-12 | 1500 | 120 | 2017-12-12 9:01:00 |
2017-12-12 | 2200 | 200 | 2017-12-12 9:02:00 |
2017-12-12 | 3300 | 320 | 2017-12-12 9:03:00 |
模块二
INSERT into result_cps_total_summary_pvuv_min
select
a.summary_date,--时间分区,天为单位
cast(DATE_FORMAT(c_time,'HH:mm') as varchar) as summary_min,
--取出小时分钟级别的时间
a.pv,
a.uv,
CURRENT_TIMESTAMP as currenttime--当前时间
from result_cps_total_summary_pvuv_min_01 AS a
把模块一的数据精确到小时分钟级别取出,最后根据数据得出PV、UV的增长曲线。如图所示。
image.png二、电商订单地域统计
image.pngSQL语句:
订单
CREATE TABLE source_order (
id VARCHAR,-- 订单ID
seller_id VARCHAR, --卖家ID
account_id VARCHAR,--买家ID
receive_address_id VARCHAR,--收货地址ID
total_price VARCHAR,--订单金额
pay_time VARCHAR --订单支付时间
) WITH (
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
project='yourProjectName',--您的project
topic='yourTopicName',--您的topic
roleArn='yourRoleArn',--您的roleArn
batchReadSize='500'
);
订单地址
CREATE TABLE source_order_receive_address (
id VARCHAR,--收货地址ID
full_name VARCHAR,--收货人全名
mobile_number VARCHAR,--收货人手机号
detail_address VARCHAR,--收货详细地址
province VARCHAR,--收货省份
city_id VARCHAR,--收货城市
create_time VARCHAR --创建时间
) WITH (
type='datahub',
endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
project='yourProjectName',--您的project
topic='yourTopicName',--您的topic
roleArn='yourRoleArn',--您的roleArn
batchReadSize='500'
);
城市表
CREATE TABLE dim_city (
city_id varchar,
city_name varchar,--城市名
province_id varchar,--所属省份ID
zip_code varchar,--邮编
lng varchar,--经度
lat varchar,--纬度
PRIMARY KEY (city_id),
PERIOD FOR SYSTEM_TIME --定义为维表
) WITH (
type= 'rds',
url = 'yourDatabaseURL',--您的数据库url
tableName = 'yourTableName',--您的表名
userName = 'yourDatabaseName',--您的用户名
password = 'yourDatabasePassword'--您的密码
);
按日统计不同地域订单(总销售额)的分布。
CREATE TABLE result_order_city_distribution (
summary_date varchar,--统计日期
city_id bigint,--城市ID
city_name varchar,--城市名
province_id bigint,--所属省份ID
gmv double,--总销售额
lng varchar,--经度
lat varchar,--纬度
primary key (summary_date,city_id)
) WITH (
type= 'rds',
url = 'yourDatabaseURL',--您的数据库url
tableName = 'yourTableName',--您的表名
userName = 'yourDatabaseName',--您的用户名
password = 'yourDatabasePassword'--您的密码
);
编辑业务逻辑
insert into result_order_city_distribution
select
d.summary_date
,cast(d.city_id as BIGINT)
,e.city_name
,cast(e.province_id as BIGINT)
,d.gmv
,e.lng
,e.lat
from
(
select
DISTINCT
DATE_FORMAT(a.pay_time,'yyyyMMdd') as summary_date
,b.city_id as city_id
,round(sum(cast(a.total_price as double)),2) as gmv
from source_order as a
join source_order_receive_address as b on a.receive_address_id =b.id
group by DATE_FORMAT(a.pay_time,'yyyyMMdd'),b.city_id
--双流join,并根据日期和城市ID得到销售额分布
)d join dim_city FOR SYSTEM_TIME AS OF PROCTIME() as e on d.city_id = e.city_id
-- join维表,补齐城市地理信息,得到最终结果
三、工业互联网传感器统计
业务描述
该工业客户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上有10个不同种类传感器,这些传感器,大概每5秒采集并上传一份数据到日志服务(Log/SLS),每个采集点格式如下。
s_id s_value s_ts
传感器ID 传感器当前值 发送时间
同时,上述传感器分布在多个设备、多个厂区,用户在RDS还记录如下传感器、设备、厂区的分布维表,如下:
s_id | s_type | device_id | factory_id |
---|---|---|---|
传感器ID | 传感器监控类型 | 设备ID | 厂区ID |
上述信息存放在RDS上,用户希望传感器上传的数据能够和上述数据关联,并将传感器数据按照设备归类每1分钟打平为一张宽表,如下:
ts | device_id | factory_id | device_temp | device_pres |
---|---|---|---|---|
时间 | 设备ID | 工厂ID | 设备温度 | 设备压力 |
为了简化不必要的逻辑,我们假定仅有两种类型的监控传感器,即温度和压力,以方便后续的计算,后续计算逻辑如下:
筛选指定温度大于80的设备,并向下游触发告警。用户选择使用MQ作为消息触发源,也就是实时计算将温度大于80的设备过滤并投递给MQ,触发下游的用户定义的告警系统。
将数据写出到在线OLAP系统中,这里用户选择了阿里云hybriddb for mysql (原petadata)。下游用户开发一整套BI系统对接PetaData进行多维度展示。
经过传感器上传的数据进入Log,当行数据格式如下:
{
"sid": "t_xxsfdsad",
"s_value": "85.5",
"s_ts": "1515228763"
}
定义Log源表为s_sensor_data,结构如下:
CREATE TABLE s_sensor_data (
s_id VARCHAR,
s_value VARCHAR,
s_ts VARCHAR,
ts AS CAST(FROM_UNIXTIME(CAST(s_ts AS BIGINT)) AS TIMESTAMP),
WATERMARK FOR ts AS withOffset(ts, 10000)
) WITH (
TYPE='sls',
endPoint ='http://cn-hangzhou-corp.sls.aliyuncs.com',
accessId ='xxxxxxxxxxx',
accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
project ='ali-cloud-streamtest',
logStore ='stream-test',
);
定义传感器和设备关联RDS维表为d_sensor_device_data,结构如下:
``` sql
CREATE TABLE d_sensor_device_data (
s_id VARCHAR,
s_type VARCHAR,
device_id BIGINT,
factory_id BIGINT,
PERIOD FOR SYSTEM_TIME,
PRIMARY KEY(s_id)
) WITH (
TYPE='RDS',
url='',
tableName='test4',
userName='test',
password='XXXXXX'
);
定义触发告警逻辑MQ表为r_monitor_data, 结构如下:
CREATE TABLE r_monitor_data (
ts VARCHAR,
device_id BIGINT,
factory_id BIGINT,
device_TEMP DOUBLE,
device_PRES DOUBLE
) WITH (
TYPE='MQ'
);
定义存储结果数据的HybridDB表定义为r_device_data,结构如下:
CREATE TABLE r_device_data (
ts VARCHAR,
device_id BIGINT,
factory_id BIGINT,
device_temp DOUBLE,
device_pres DOUBLE,
PRIMARY KEY(ts, device_id)
) WITH (
TYPE='HybridDB'
);
先考虑将传感器数据按分钟级别进行汇总,打平为一个宽表。为了更加结构化代码方便后续代码维护,使用View:
--先获取每个传感器对应的设备、厂区
CREATE VIEW v_sensor_device_data
AS
SELECT
s.ts,
s.s_id,
s.s_value,
d.s_type,
d.device_id,
d.factory_id
FROM
s_sensor_data s
JOIN
d_sensor_device_data FOR SYSTEM_TIME AS OF PROCTIME() as d
ON
s.s_id = d.s_id;
--打平为一张宽表。
CREATE VIEW v_device_data
AS
SELECT
--使用滚窗的起始时间作为该条记录的时间
CAST(TUMBLE_START(v.ts, INTERVAL '1' MINUTE) AS VARCHAR) as ts,
v.device_id,
v.factory_id,
CAST(SUM(IF(v.s_type = 'TEMP', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'TEMP', 1, 0)) AS DOUBLE) device_temp, --这里用于计算这一分钟的温度平均值
CAST(SUM(IF(v.s_type = 'PRES', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'PRES', 1, 0)) AS DOUBLE) device_pres --这里用于计算这一分钟的压力平均值
FROM
v_sensor_device_data v
GROUP BY
TUMBLE(v.ts, INTERVAL '1' MINUTE), v.device_id, v.factory_id;
上述是核心计算逻辑,将这一分钟内分别统计关于温度和压力的平均值,作为这一分钟的温度值、压力值。由于使用的是Tumbling Window,也就意味着数据将在每分钟结束时候产出一份。接下来就将数据过滤写出到MQ和HybridDB,如下:
--过滤温度大于80摄氏度的传感器,并写出到MQ触发告警。
INSERT INTO r_monitor_data
SELECT
ts,
device_id,
factory_id,
device_temp,
device_pres
FROM
v_device_data
WHERE
device_temp > 80.0;
--将数据写出到HybridDB,用于后续的分析。
INSERT INTO r_device_data
SELECT
ts,
device_id,
factory_id,
device_temp,
device_pres
FROM
v_device_data;
网友评论