在数据的项目里,不管是中台或数仓,都涉及到数据的接入和数据的开放,这中间必然会遇到到各个平台数据交互的问题(以数据接入为例:接入的数据,上游数据何时到达,下游何时采集,以及采集后的反馈)。本文主要举例其中一个解决方法(算是一个比较重的也是较常规的方式)—— 以数据库表,作为跨平台之间的沟通介质。下面进行详细介绍
设计思路
- 以数据库表作为中间介质,上游平台通过写入一行数据方式来进行触发数据是否达到。
- 下游轮询该表数据,读取到数据,则触发采集流程,拉取完数据后,进行更新数据表中的状态,作为反馈。
- 轮询脚本主要通过 python + shell进行实现。
- 插件化设计思路,实现可插拔。
设计流程以及代码demo
- 交互表设计
create table rcv_leap_platform_interact (
id bigint(20) NOT NULL AUTO_INCREMENT comment '自增id',
sys_code varchar(50) not null comment '来源业务系统',
sys_name varchar(200) not null comment '来源系统业务名称',
db_name varchar(20) not null comment '库名',
table_name varchar(200) not null comment '表名称',
status int default 1 comment '状态码 推送完毕: 1; 读取完毕: 2;',
data_time varchar(20) comment '数据业务时间yyyymmdd/hh',
data_cnt bigint comment '数据条数',
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
status字段为枚举值: 1为上游平台数据已到达标识; 2为下游数据已拉取完毕标识;
-
脚本结构如下
脚本结构
- 具体实现
- leapPlatform.sh 为主启动脚本,主要用于插件化配置。
#!/bin/bash
python ./leapPlatform.py "$@"
- leapPlatform.py 为主程序入口,主要使用optparse.OptionParser实现了命令行参数将前置交互和后置交互使用两个参数来区分
- -g: 获取记录数,读取到符合条件的记录则校验通过,启动后续操作
- -p: 推送记录数,数据推送完毕后,向中间库写入一条记录作为标记,让后续其他平台读取应用
- -u: 拉取数据后,更新数据库表中status状态
# coding=utf-8
import os,sys,time
from utils.execDb import ExecDB
from optparse import OptionParser
from utils.commonFunc import getCurrTime
from setting import *
def get_option_parser():
"""
命令行选项解析器
:return: parser
"""
hstr = "%prog config"
parser = OptionParser(hstr, description='Solve data interaction', version='%prog 1.0')
parser.add_option("-p", "--put",
dest="putInfo2Db",
help="write Info to DB; The params: (period platform outputTabs)",
metavar="putInfo",
action="store_true")
parser.add_option("-g", "--get",
dest="getInfo2Db",
help="read Info from DB; The params: (period platform inputTabs)",
metavar="getInfo",
action="store_true")
parser.add_option("-u", "--update",
dest="updateStatus",
help="update info status; The params: (period platform inputTabs)",
metavar="updateStatus",
action="store_true")
return parser
if __name__ == '__main__':
g_tm_ymd = '20210602'
g_tm_ymdh = os.getenv('g_tm_ymdh')
parser = get_option_parser()
options, args = parser.parse_args(sys.argv[1:])
if args[0].lower() == 'day':
exec_db = ExecDB(g_tm_ymd)
elif args[1].lower() == 'hour':
exec_db = ExecDB(g_tm_ymdh)
else:
raise Exception("日期格式暂不支持~~")
# -p 执行put数据至数据库,将数据结果输出到数据库;
if options.putInfo2Db and len(args) > 0:
print("传递参数:{0}".format(args[1:]))
exec_db.write_db(args[1], args[2:])
# -g 执行get数据,查询数据结果,进行判断上游依赖表数据是否到位
if options.getInfo2Db and len(args) > 0:
print("------ {0} -------------------------------预执行处理---------------------------------------------".format(getCurrTime()))
# print("get~~")
nums = len(args[1:])
print("传递参数:{0}".format(args[1:]))
row_nums = exec_db.read_db(args[1], args[2:])
_max_poll_count = MAX_POLL_COUNT
_poll_retries_count = 0
_wait_timeout = WAIT_TIMEOUT
while True and _poll_retries_count < _max_poll_count:
if nums - 1 == row_nums:
print("校验成功~~")
# break
sys.exit(0)
else:
print("校验不成功,休眠10min~~")
time.sleep(_wait_timeout)
if args[0].lower() == 'day':
exec_db = ExecDB(g_tm_ymd)
elif args[1].lower() == 'hour':
exec_db = ExecDB(g_tm_ymdh)
else:
raise Exception("日期格式暂不支持~~")
print("轮询重试中:{}".format(_poll_retries_count))
row_nums = exec_db.read_db(args[1], args[2:])
_poll_retries_count += 1
# 超时异常退出
print("轮询超时异常退出")
sys.exit(1)
# -u 执行完成任务后,更新交互记录数状态,将1更新为2,标识已读
if options.updateStatus and len(args) > 0:
print("------ {0} -------------------------------末执行处理---------------------------------------------".format(getCurrTime()))
nums = len(args[1:])
if nums <= 0:
raise Exception("参数说明~~")
print("传递参数:{0}".format(args[1:]))
exec_db.update_status(args[1], args[2:])
- setting.py 为配置信息
#coding=utf-8
DB_PATH='xxxxxxxx\db.ini'
# 数据库的重试配置
MAX_RETRIES_COUNT = 5
CONN_TIMEOUT = 10
############# 交互表信息
ENV_NAME = "rcvedw_dlk"
INTER_TAB = "rcvedw.rcv_leap_platform_interact"
# 扫描轮询次数和时长
MAX_POLL_COUNT = 5
WAIT_TIMEOUT = 600
- utils 目录中主要程序中用到公共函数、链接数据库、数据库CURD操作
getDbInfo.py 链接数据库
# coding=utf-8
import pymysql, configparser, time
from leapPlatformRely.setting import *
from urllib.parse import urlparse
class GetDbInfo:
def __init__(self):
# 读取数据库配置文件
self.conf = configparser.ConfigParser()
self.conf.read(DB_PATH)
# 链接数据库
def link_db(self, env_name):
dbtype = self.conf.get(env_name, 'dbtype')
jdbc = self.conf.get(env_name, 'jdbcurl')
result = urlparse(urlparse(jdbc).path)
host = result.netloc.split(':')[0]
port = result.netloc.split(':')[1]
if dbtype == 'mysql':
# 数据库连接重试和链接超时功能实现
_conn_status = True
_max_retries_count = MAX_RETRIES_COUNT # 最大重试次数
_conn_retries_count = 0 # 初始重试次数
_conn_timeout = CONN_TIMEOUT
while _conn_status and _conn_retries_count < _max_retries_count:
try:
# 链接数据库
db = pymysql.connect(
host=host,
user=self.conf.get(env_name, 'username'),
password=self.conf.get(env_name, 'password'),
port=int(port),
# database=db_name,
charset='utf8'
)
_conn_status = False
# print(db.get_host_info())
return db
except:
_conn_retries_count += 1
print("链接重试中:{}".format(_conn_retries_count))
# 休眠重试
time.sleep(_conn_timeout)
else:
print("database not support~~")
if __name__ == '__main__':
get_db_info = GetDbInfo()
info = get_db_info.link_db('xxxxx')
print(info)
execDb.py 数据库CURD操作
# coding=utf-8
from leapPlatformRely.setting import ENV_NAME
from leapPlatformRely.utils.getDbInfo import GetDbInfo
from leapPlatformRely.utils.sentence import select_sql, get_cnt_sql, write_sql,delete_sql
from leapPlatformRely.utils.sentence import update_sql
class ExecDB:
def __init__(self,check_date):
get_db_info = GetDbInfo()
self.db = get_db_info.link_db(ENV_NAME)
self.check_date = check_date
# 写执行
def write_db(self, plate_form, tab_names: list):
cursor = self.db.cursor()
for tab_name in tab_names:
cnt_sql = get_cnt_sql(plate_form, tab_name,self.check_date)
del_sql = delete_sql(plate_form, tab_name,self.check_date)
print("查询记录数sql: {0}".format(cnt_sql))
# 执行写入数据库(新获取记录数、预删除数据再进行写入信息)
try:
# 查询数据量大小
cursor.execute(cnt_sql)
cnt = cursor.fetchall()
# 预删除
cursor.execute(del_sql)
print("预删除操作~~")
print("删除sql: {0}".format(del_sql))
# 将数据写入交互表
wt_sql = write_sql(plate_form, tab_name, cnt[0][0],self.check_date)
print("插入sql: {0}".format(wt_sql))
cursor.execute(wt_sql)
except Exception as e:
print(f"Exception:{e}")
else:
print("Execute is ok~~")
# 提交事务
self.finall_exec()
# 读执行
def read_db(self, plate_form, tab_name: list):
sql = select_sql(plate_form, tab_name,self.check_date)
print("读取sql: {0}".format(sql))
cursor = self.db.cursor()
# 执行读取数据库
try:
cursor.execute(sql)
row_nums = cursor.rowcount
except Exception as e:
print(f"Exception:{e}")
else:
print("Execute is ok~~")
return row_nums
finally:
self.db.commit()
self.db.close()
# 更新执行
def update_status(self,plate_form,tab_name):
sql = update_sql(plate_form, tab_name, self.check_date)
print("更新sql: {0}".format(sql))
cursor = self.db.cursor()
# 执行读取数据库
try:
cursor.execute(sql)
except Exception as e:
print(f"Exception:{e}")
else:
print("Execute is ok~~")
finally:
self.db.commit()
self.db.close()
def finall_exec(self):
self.db.commit()
self.db.close()
详细code可通过gitee获取 木讷DATA
网友评论