最近无更新,是因为在写fastapi + vue, 准备写完了再放上来;
这次更新所做的是领导安排的任务,filecmp对比库没用过,索性放上来了;
改进之前文件比对脚本;
我改进的主要有两点
1 将sql提交合并到一次
2 将单线程检查的脚本改为多进程
通过这个方法提高巡检速度;
以下是脚本
脚本
需要注意点的
之前用的threading,他定义的变量是多个线程间可以共享的;
这次改用multiprocessing发现变量读取不了;
写了测试用例才发现,以后要注意;
以下为 multiprocessing.Manager().list()
import os,sys
import filecmp
import pymysql
from datetime import datetime
def compareme(dir1, dir2, list):
dircomp=filecmp.dircmp(dir1, dir2, ignore=['_gsdata_'])
left=dircomp.left_only
diff=dircomp.diff_files
comm=dircomp.common_dirs
[list.append(os.path.abspath(os.path.join(dir1,x))) for x in left]
[list.append(os.path.abspath(os.path.join(dir1,y))) for y in diff]
for i in comm:
compareme(os.path.abspath(os.path.join(dir1,i)), os.path.abspath(os.path.join(dir2,i)), list)
def get_sql(**kwargs):
sour = kwargs.get('sour')
dect = kwargs.get('dect')
tablename = kwargs.get('tablename')
cur = kwargs.get('cur')
dict = {
'D':'efb-master_192.168.110.93',
'H':"efb-slave_192.168.110.94",
'I':"pudongefb-bak_10.128.129.113",
'G':'telcom_192.168.190.89',
'J':'unicom_192.168.176.98',
'K':'pudongefb_10.128.129.112'
}
target=dect[:1]
if os.path.exists(target+":"):
list = []
compareme(sour, dect, list)
temp = list[:]
for i in temp:
if '_gsdata_' in i:
list.remove(i)
if len(list):
sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "fail", cur))
for i in range(len(list)):
print("Time %s number %s the ip %s file %s has not sync!" % (cur, i + 1, dict[target].split('_')[1:2][0], list[i]))
print("Time %s the %s ip %s sync-log has fail create!" % (cur, dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0]))
else:
sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "success", cur))
print("Time %s the %s ip %s sync-log has success create!" % (cur, dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0]))
else:
sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "success", cur))
print("Pls check %s mount" % target)
def get_sql_list(dirs_dict, tablename, cur):
import multiprocessing
task = 6
sql_list = multiprocessing.Manager().list()
create_sql = "create table if not exists %s" % tablename + "(syncname varchar(25),node varchar(25),result varchar(25),cr_date varchar(25))"
sql_list.append(create_sql)
while dirs_dict:
exec_list = []
for _ in range(task):
if dirs_dict:
dir_dict = dirs_dict.pop(0)
exec_list.append(dir_dict)
all_process = []
for dir in exec_list:
sour = dir['source']
dect = dir['dist']
_process = multiprocessing.Process(target=get_sql, kwargs={
'sour': sour,
'dect': dect,
'tablename': tablename,
'cur': cur,
'sql_list': sql_list
})
all_process.append(_process)
[ p.start() for p in all_process ]
[ p.join() for p in all_process ]
return sql_list
def mysql_conn(**mysql_cfg):
try:
conn = pymysql.connect(
host = mysql_cfg.get('host', None),
user = mysql_cfg.get('user', None),
passwd = mysql_cfg.get('passwd', None),
db = mysql_cfg.get('db', None),
port = mysql_cfg.get('port', 3306),
charset = mysql_cfg.get('charset', 'utf8')
)
except Exception as e:
return str(e), None
else:
return None, conn
def exec_sql(sql_list, conn):
try:
cursor = conn.cursor()
for sql in sql_list:
cursor.execute(sql)
conn.commit()
except Exception as e:
conn.rollback()
return str(e)
else:
cursor.close()
return None
def main():
mysql_cfg = {
'host': '192.168.1.111',
'user': 'test',
'passwd': '123456',
'db': 'test'
}
dirs_dict = [
{ 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'H:\efbtomcat\webapps\ROOT\PackageData' },
{ 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'I:\efbtomcat\webapps\ROOT\PackageData' },
{ 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'G:\efbtomcat\webapps\ROOT\PackageData' },
{ 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'J:\efbtomcat\webapps\ROOT\PackageData' },
{ 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'K:\efbtomcat\webapps\ROOT\PackageData' }
]
table_name = "efb_synclog_test_by_021786"
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur = datetime.strptime(now_time,'%Y-%m-%d %H:%M:%S')
total_num = len(dirs_dict) + 1
# 获取比对后的sql
sql_list = get_sql_list(dirs_dict, table_name, cur)
if len(sql_list) != total_num:
error_info = 'get sql list error!\n'
print(error_info)
return
else:
for sql in sql_list:
print(sql)
print('=========================')
print('已获取全部sql')
# 连接数据库
error, conn = mysql_conn(**mysql_cfg)
if error != None:
error_info = 'connect mysql error!\n' + error + '\n'
print(error_info)
return
else:
print('数据库连接成功')
# 执行sql
error = exec_sql(sql_list, conn)
if error != None:
error_info = 'exec sql list error!\n' + error + '\n'
print(error_info)
return
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'执行结束, 检查数据库')
if __name__ == '__main__':
main()
网友评论