1.应用场景
之前数据清洗常做的操作是,从MySQL中读取数据集导出CSV数据集,然后用pandas读取数据,然后做数据报告。用pymysql模块可方便很多。对于一般数据分析来说,一般生产库只给查询权限,因此需要用pymysql模块迁移一个新数据仓库,供PowerBI的仪表盘使用。
在此分享一个小案例。
2.先写好SQL查询语句,并且在数据仓库建表
在生产库中,写好查询语句;
在数据仓库,即自己有权限的数据库中,建一张表,等会用于查询语句的迁入。
CREATE TABLE `counts_order` (
`houseid` int(11) DEFAULT NULL COMMENT '房源id',
`order_code` varchar(50) DEFAULT NULL COMMENT '订单号',
`merchant_name` varchar(64) DEFAULT NULL,
`check_in_time` datetime DEFAULT NULL COMMENT '入住时间',
`order_data` date DEFAULT NULL COMMENT '订单日期',
`group_name` varchar(50) DEFAULT NULL COMMENT '分组名称',
`order_count` int(11) DEFAULT NULL COMMENT '订单数',
`order_gmv` decimal(10,2) DEFAULT NULL COMMENT '订单GMV',
`order_nights` int(11) DEFAULT NULL COMMENT '间夜数',
`order_count_qx` int(11) DEFAULT NULL COMMENT '取消订单数',
`order_gmv_qx` decimal(10,2) DEFAULT NULL COMMENT '取消订单GMV',
`order_nights_qx` int(11) DEFAULT NULL COMMENT '取消间夜数',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.写Python脚本,将查询数据迁入到新数据仓库。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import traceback
import pymysql
lis = []
def main():
#修改SQL
insert_sql = '''
select
a.house_id,a.order_code,merchant.merchant_name,a.check_in_time, DATE(a.order_time) order_data,hg.group_name,
mark_zc as order_count,
a.GMV*a.mark_zc as order_gmv,
a.jianye*a.mark_zc as order_nights,
mark_qx as order_count_qx,
a.GMV*a.mark_qx as order_gmv_qx,
a.jianye*a.mark_qx as order_nights_qx
from (
select house_id,order_code,merchant_id,order_status,type,inventory,order_time,check_in_time,
ifnull(total_money,0)+ifnull(clean_money,0) as GMV,
TIMESTAMPDIFF(day,check_in_time,check_out_time)*inventory as jianye,
if(order_status IN ('WAITING_CHECKIN','CHECKIN','CHECKOUT'),1,0) as mark_zc,
if(order_status ='CANCELED',1,0) as mark_qx
from tbl_biz_order
where type = 'normal'
) a
left join tbl_info_house house on a.house_id = house.houseid
left join house_group_rel hgr on hgr.house_id = house.houseid
left join house_group hg on hg.id = hgr.house_group_id
left join merchant on merchant.id = a.merchant_id;
'''
# print("sql:" + inquire_sql)
#从生产库查数据
try:
conn = pymysql.connect(
host='******',
port=3306,
user='******',
passwd='******',
db='******',
charset='UTF8'
)
cmd = conn.cursor()
cmd.execute(query=insert_sql)
#fetchall遍历sql读取的数据集
for i in cmd.fetchall():
lis.append(i)
except Exception:
print("处理异常:" + traceback.format_exc())
finally:
conn.close()
"""
adm写入层
"""
def MySQL_Insert(lis):
#需要插入的表格
insert_sql = 'insert into counts_order (houseid,order_code,merchant_name,check_in_time,order_data,group_name,order_count,order_gmv,order_nights, order_count_qx,order_gmv_qx,order_nights_qx) value (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'
#将查询数据插入到新数据仓库,刚才建的表里面
try:
conn = pymysql.connect(
host='********',
port=3306,
user='********',
passwd='*******',
db='******',
charset='UTF8'
)
cmd = conn.cursor()
#对应修改的sql表
cmd.execute(query="truncate counts_order")
cmd.executemany(insert_sql, lis)
conn.commit()
except Exception:
print("处理异常:" + traceback.format_exc())
finally:
conn.close()
if __name__ == '__main__':
print("start")
main()
print("result:" )
print(lis[0])
MySQL_Insert(lis)
print("end")
4.将上面的脚本放在脚本服务器上,让这个脚本的定时刷新,那么数据仓库中的数据就会根据现有数据库而更新,保证数据的实时性。
5.PowerBI展示数据
接下来,可以用PowerBI连接刚才的数据。
image.png
在此要输入服务器,数据库名称等。前面有文章介绍过数据库连接了,在此不重复。
上面重新插入的数据为每日流水数据,接下来简单计算下度量值。然后按照筛选条件筛选即可。
image.png
接下来将这个仪表盘发布。主页-发布。
之后登陆PowerBI的个人服务区,为刚才发布文件的数据集设置网关,数据源凭证,计划刷新时间。之前有文章介绍过,在此不做重复说明。 image.png
最后,一份会定时刷新的PowerBI仪表盘就制作完毕。
https://app.powerbi.cn/view?r=eyJrIjoiNjI0NTAxODktNmVmMS00OWQ1LTg2ZDUtYTNkZWEyYTg0NGE5IiwidCI6ImViNmFiZTZkLTg2OTQtNGE5YS04OWZjLWY3ZDU2MTc2NmUyMSJ9&pageName=ReportSectionfe67b81ea608d1618c40
网友评论