美文网首页Pythonpython
python3--文件filecmp比对+多进程multipro

python3--文件filecmp比对+多进程multipro

作者: w_dll | 来源:发表于2021-06-12 19:47 被阅读0次

    最近无更新,是因为在写fastapi + vue, 准备写完了再放上来;
    这次更新所做的是领导安排的任务,filecmp对比库没用过,索性放上来了;
    改进之前文件比对脚本;
    我改进的主要有两点
    1 将sql提交合并到一次
    2 将单线程检查的脚本改为多进程
    通过这个方法提高巡检速度;

    以下是脚本

    脚本

    需要注意点的
    之前用的threading,他定义的变量是多个线程间可以共享的;
    这次改用multiprocessing发现变量读取不了;
    写了测试用例才发现,以后要注意;
    以下为 multiprocessing.Manager().list()

    import os,sys
    import filecmp
    import pymysql
    from datetime import datetime
    
    
    def compareme(dir1, dir2, list):
        dircomp=filecmp.dircmp(dir1, dir2, ignore=['_gsdata_'])
        left=dircomp.left_only
        diff=dircomp.diff_files
        comm=dircomp.common_dirs
        [list.append(os.path.abspath(os.path.join(dir1,x))) for x in left]
        [list.append(os.path.abspath(os.path.join(dir1,y))) for y in diff]
        for i in comm:
            compareme(os.path.abspath(os.path.join(dir1,i)), os.path.abspath(os.path.join(dir2,i)), list)
    
    
    def get_sql(**kwargs):
        sour      = kwargs.get('sour')
        dect      = kwargs.get('dect')
        tablename = kwargs.get('tablename')
        cur       = kwargs.get('cur')
        dict = {
                    'D':'efb-master_192.168.110.93',
                    'H':"efb-slave_192.168.110.94",
                    'I':"pudongefb-bak_10.128.129.113",
                    'G':'telcom_192.168.190.89',
                    'J':'unicom_192.168.176.98',
                    'K':'pudongefb_10.128.129.112'
               }
        target=dect[:1]
        if os.path.exists(target+":"):
            list = []
            compareme(sour, dect, list)
            temp = list[:]
            for i in temp:
                if '_gsdata_' in i:
                    list.remove(i)
            if len(list):
                sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
                kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "fail", cur))
                for i in range(len(list)):
                    print("Time %s number %s the ip %s file %s has not sync!" % (cur, i + 1, dict[target].split('_')[1:2][0], list[i]))
                print("Time %s the %s ip %s sync-log has fail create!" % (cur, dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0]))
            else:
                sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
                kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "success", cur))
                print("Time %s the %s ip %s sync-log has success create!" % (cur, dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0]))
        else:
            sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
            kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "success", cur))
            print("Pls check %s mount" % target)
    
    
    def get_sql_list(dirs_dict, tablename, cur):
        import multiprocessing
        task       = 6
        sql_list   = multiprocessing.Manager().list()
        create_sql = "create table if not exists %s" % tablename + "(syncname varchar(25),node varchar(25),result varchar(25),cr_date varchar(25))"
        sql_list.append(create_sql)
        while dirs_dict:
            exec_list = []
            for _ in range(task):
                if dirs_dict:
                    dir_dict = dirs_dict.pop(0)
                    exec_list.append(dir_dict)
            all_process = []
            for dir in exec_list:
                sour = dir['source']
                dect = dir['dist']
                _process = multiprocessing.Process(target=get_sql, kwargs={
                               'sour': sour,
                               'dect': dect,
                               'tablename': tablename,
                               'cur': cur,
                               'sql_list': sql_list
                           })
                all_process.append(_process)
            [ p.start() for p in all_process ]
            [ p.join() for p in all_process ]
        return sql_list
    
    
    def mysql_conn(**mysql_cfg):
        try:
            conn = pymysql.connect(
                       host    = mysql_cfg.get('host', None),
                       user    = mysql_cfg.get('user', None),
                       passwd  = mysql_cfg.get('passwd', None),
                       db      = mysql_cfg.get('db', None),
                       port    = mysql_cfg.get('port', 3306),
                       charset = mysql_cfg.get('charset', 'utf8')
                   )
        except Exception as e:
            return str(e), None
        else:
            return None, conn
    
    
    def exec_sql(sql_list, conn):
        try:
            cursor = conn.cursor()
            for sql in sql_list:
                cursor.execute(sql)
            conn.commit()
        except Exception as e:
            conn.rollback()
            return str(e)
        else:
            cursor.close()
            return None
    
    
    def main():
        mysql_cfg = {
                        'host': '192.168.1.111',
                        'user': 'test',
                        'passwd': '123456',
                        'db': 'test'
                    }
        dirs_dict = [
                        { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'H:\efbtomcat\webapps\ROOT\PackageData' },
                        { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'I:\efbtomcat\webapps\ROOT\PackageData' },
                        { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'G:\efbtomcat\webapps\ROOT\PackageData' },
                        { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'J:\efbtomcat\webapps\ROOT\PackageData' },
                        { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'K:\efbtomcat\webapps\ROOT\PackageData' }
                    ]
        table_name = "efb_synclog_test_by_021786"
        now_time   = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        cur        = datetime.strptime(now_time,'%Y-%m-%d %H:%M:%S')
        total_num  = len(dirs_dict) + 1
    
        # 获取比对后的sql
        sql_list = get_sql_list(dirs_dict, table_name, cur)
        if len(sql_list) != total_num:
            error_info = 'get sql list error!\n'
            print(error_info)
            return
        else:
            for sql in sql_list:
                print(sql)
                print('=========================')
            print('已获取全部sql')
    
        # 连接数据库
        error, conn = mysql_conn(**mysql_cfg)
        if error != None:
            error_info = 'connect mysql error!\n' + error + '\n'
            print(error_info)
            return
        else:
            print('数据库连接成功')
    
        # 执行sql
        error = exec_sql(sql_list, conn)
        if error != None:
            error_info = 'exec sql list error!\n' + error + '\n'
            print(error_info)
            return
        print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'执行结束, 检查数据库')
    
    
    if __name__ == '__main__':
        main()
    

    执行结果

    命令行结果

    数据库结果

    相关文章

      网友评论

        本文标题:python3--文件filecmp比对+多进程multipro

        本文链接:https://www.haomeiwen.com/subject/rvbbjltx.html