美文网首页
01.数据交互(跨平台交互)—数据库表为介质

01.数据交互(跨平台交互)—数据库表为介质

作者: 木讷DATA | 来源:发表于2021-10-23 16:09 被阅读0次

在数据的项目里,不管是中台或数仓,都涉及到数据的接入和数据的开放,这中间必然会遇到到各个平台数据交互的问题(以数据接入为例:接入的数据,上游数据何时到达,下游何时采集,以及采集后的反馈)。本文主要举例其中一个解决方法(算是一个比较重的也是较常规的方式)—— 以数据库表,作为跨平台之间的沟通介质。下面进行详细介绍

设计思路

  1. 以数据库表作为中间介质,上游平台通过写入一行数据方式来进行触发数据是否达到。
  2. 下游轮询该表数据,读取到数据,则触发采集流程,拉取完数据后,进行更新数据表中的状态,作为反馈。
  3. 轮询脚本主要通过 python + shell进行实现。
  4. 插件化设计思路,实现可插拔。

设计流程以及代码demo

  1. 交互表设计
create table rcv_leap_platform_interact (
  id bigint(20) NOT NULL AUTO_INCREMENT comment '自增id',
  sys_code varchar(50) not null comment '来源业务系统', 
  sys_name varchar(200) not null comment '来源系统业务名称',
  db_name varchar(20) not null comment '库名',
  table_name varchar(200) not null comment '表名称',
  status int default 1 comment '状态码 推送完毕: 1; 读取完毕: 2;',
  data_time varchar(20) comment '数据业务时间yyyymmdd/hh',
  data_cnt bigint comment '数据条数',
  create_time  timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  update_time  timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

status字段为枚举值: 1为上游平台数据已到达标识; 2为下游数据已拉取完毕标识;

  1. 脚本结构如下


    脚本结构
  2. 具体实现
  • leapPlatform.sh 为主启动脚本,主要用于插件化配置。
#!/bin/bash

python ./leapPlatform.py "$@"
  • leapPlatform.py 为主程序入口,主要使用optparse.OptionParser实现了命令行参数将前置交互和后置交互使用两个参数来区分
    • -g: 获取记录数,读取到符合条件的记录则校验通过,启动后续操作
    • -p: 推送记录数,数据推送完毕后,向中间库写入一条记录作为标记,让后续其他平台读取应用
    • -u: 拉取数据后,更新数据库表中status状态
# coding=utf-8

import os,sys,time
from utils.execDb import ExecDB
from optparse import OptionParser
from utils.commonFunc import getCurrTime
from setting import *


def get_option_parser():
    """
    命令行选项解析器
    :return: parser
    """
    hstr = "%prog config"
    parser = OptionParser(hstr, description='Solve data interaction', version='%prog 1.0')
    parser.add_option("-p", "--put",
                      dest="putInfo2Db",
                      help="write Info to DB; The params: (period platform outputTabs)",
                      metavar="putInfo",
                      action="store_true")

    parser.add_option("-g", "--get",
                      dest="getInfo2Db",
                      help="read Info from DB; The params: (period platform inputTabs)",
                      metavar="getInfo",
                      action="store_true")

    parser.add_option("-u", "--update",
                      dest="updateStatus",
                      help="update info status; The params: (period platform inputTabs)",
                      metavar="updateStatus",
                      action="store_true")

    return parser


if __name__ == '__main__':
    g_tm_ymd = '20210602'
    g_tm_ymdh = os.getenv('g_tm_ymdh')

    parser = get_option_parser()
    options, args = parser.parse_args(sys.argv[1:])
    if args[0].lower() == 'day':
        exec_db = ExecDB(g_tm_ymd)
    elif args[1].lower() == 'hour':
        exec_db = ExecDB(g_tm_ymdh)
    else:
        raise Exception("日期格式暂不支持~~")

    # -p 执行put数据至数据库,将数据结果输出到数据库;
    if options.putInfo2Db and len(args) > 0:
        print("传递参数:{0}".format(args[1:]))

        exec_db.write_db(args[1], args[2:])

    # -g 执行get数据,查询数据结果,进行判断上游依赖表数据是否到位
    if options.getInfo2Db and len(args) > 0:
        print("------ {0} -------------------------------预执行处理---------------------------------------------".format(getCurrTime()))
        # print("get~~")
        nums = len(args[1:])
        print("传递参数:{0}".format(args[1:]))

        row_nums = exec_db.read_db(args[1], args[2:])
        _max_poll_count = MAX_POLL_COUNT
        _poll_retries_count = 0
        _wait_timeout = WAIT_TIMEOUT

        while True and _poll_retries_count < _max_poll_count:
            if nums - 1 == row_nums:
                print("校验成功~~")
                # break
                sys.exit(0)
            else:
                print("校验不成功,休眠10min~~")
                time.sleep(_wait_timeout)
                if args[0].lower() == 'day':
                    exec_db = ExecDB(g_tm_ymd)
                elif args[1].lower() == 'hour':
                    exec_db = ExecDB(g_tm_ymdh)
                else:
                    raise Exception("日期格式暂不支持~~")

                print("轮询重试中:{}".format(_poll_retries_count))
                row_nums = exec_db.read_db(args[1], args[2:])
                _poll_retries_count += 1

        # 超时异常退出
        print("轮询超时异常退出")
        sys.exit(1)

    # -u 执行完成任务后,更新交互记录数状态,将1更新为2,标识已读
    if options.updateStatus and len(args) > 0:
        print("------ {0} -------------------------------末执行处理---------------------------------------------".format(getCurrTime()))
        nums = len(args[1:])
        if nums <= 0:
            raise Exception("参数说明~~")

        print("传递参数:{0}".format(args[1:]))

        exec_db.update_status(args[1], args[2:])
  • setting.py 为配置信息
#coding=utf-8

DB_PATH='xxxxxxxx\db.ini'

# 数据库的重试配置
MAX_RETRIES_COUNT = 5
CONN_TIMEOUT = 10

############# 交互表信息
ENV_NAME = "rcvedw_dlk"
INTER_TAB = "rcvedw.rcv_leap_platform_interact"

# 扫描轮询次数和时长
MAX_POLL_COUNT = 5
WAIT_TIMEOUT = 600

  • utils 目录中主要程序中用到公共函数、链接数据库、数据库CURD操作
    getDbInfo.py 链接数据库
# coding=utf-8

import pymysql, configparser, time
from leapPlatformRely.setting import *
from urllib.parse import urlparse


class GetDbInfo:

    def __init__(self):
        # 读取数据库配置文件
        self.conf = configparser.ConfigParser()
        self.conf.read(DB_PATH)

    # 链接数据库
    def link_db(self, env_name):
        dbtype = self.conf.get(env_name, 'dbtype')
        jdbc = self.conf.get(env_name, 'jdbcurl')
        result = urlparse(urlparse(jdbc).path)
        host = result.netloc.split(':')[0]
        port = result.netloc.split(':')[1]

        if dbtype == 'mysql':
            # 数据库连接重试和链接超时功能实现
            _conn_status = True
            _max_retries_count = MAX_RETRIES_COUNT  # 最大重试次数
            _conn_retries_count = 0  # 初始重试次数
            _conn_timeout = CONN_TIMEOUT
            while _conn_status and _conn_retries_count < _max_retries_count:
                try:
                    # 链接数据库
                    db = pymysql.connect(
                        host=host,
                        user=self.conf.get(env_name, 'username'),
                        password=self.conf.get(env_name, 'password'),
                        port=int(port),
                        # database=db_name,
                        charset='utf8'
                    )
                    _conn_status = False
                    # print(db.get_host_info())
                    return db
                except:
                    _conn_retries_count += 1
                    print("链接重试中:{}".format(_conn_retries_count))
                # 休眠重试
                time.sleep(_conn_timeout)
        else:
            print("database not support~~")


if __name__ == '__main__':
    get_db_info = GetDbInfo()
    info = get_db_info.link_db('xxxxx')
    print(info)

execDb.py 数据库CURD操作

# coding=utf-8

from leapPlatformRely.setting import ENV_NAME
from leapPlatformRely.utils.getDbInfo import GetDbInfo
from leapPlatformRely.utils.sentence import select_sql, get_cnt_sql, write_sql,delete_sql

from leapPlatformRely.utils.sentence import update_sql


class ExecDB:
    def __init__(self,check_date):
        get_db_info = GetDbInfo()
        self.db = get_db_info.link_db(ENV_NAME)
        self.check_date = check_date

    # 写执行
    def write_db(self, plate_form, tab_names: list):
        cursor = self.db.cursor()
        for tab_name in tab_names:
            cnt_sql = get_cnt_sql(plate_form, tab_name,self.check_date)
            del_sql = delete_sql(plate_form, tab_name,self.check_date)
            print("查询记录数sql: {0}".format(cnt_sql))
            #  执行写入数据库(新获取记录数、预删除数据再进行写入信息)
            try:
                # 查询数据量大小
                cursor.execute(cnt_sql)
                cnt = cursor.fetchall()
                # 预删除
                cursor.execute(del_sql)
                print("预删除操作~~")
                print("删除sql: {0}".format(del_sql))
                # 将数据写入交互表
                wt_sql = write_sql(plate_form, tab_name, cnt[0][0],self.check_date)
                print("插入sql: {0}".format(wt_sql))
                cursor.execute(wt_sql)

            except Exception as e:
                print(f"Exception:{e}")
            else:
                print("Execute is ok~~")
        # 提交事务
        self.finall_exec()

    # 读执行
    def read_db(self, plate_form, tab_name: list):
        sql = select_sql(plate_form, tab_name,self.check_date)
        print("读取sql: {0}".format(sql))
        cursor = self.db.cursor()
        #  执行读取数据库
        try:
            cursor.execute(sql)
            row_nums = cursor.rowcount
        except Exception as e:
            print(f"Exception:{e}")
        else:
            print("Execute is ok~~")
            return row_nums
        finally:
            self.db.commit()
            self.db.close()

    # 更新执行
    def update_status(self,plate_form,tab_name):
        sql = update_sql(plate_form, tab_name, self.check_date)
        print("更新sql: {0}".format(sql))
        cursor = self.db.cursor()
        #  执行读取数据库
        try:
            cursor.execute(sql)
        except Exception as e:
            print(f"Exception:{e}")
        else:
            print("Execute is ok~~")
        finally:
            self.db.commit()
            self.db.close()

    def finall_exec(self):
        self.db.commit()
        self.db.close()



详细code可通过gitee获取 木讷DATA

相关文章

网友评论

      本文标题:01.数据交互(跨平台交互)—数据库表为介质

      本文链接:https://www.haomeiwen.com/subject/opnkaltx.html