美文网首页算法工程Python学习
Python:多进程同步共享全局变量(锁,计数器,原子布尔)

Python:多进程同步共享全局变量(锁,计数器,原子布尔)

作者: xiaogp | 来源:发表于2021-11-20 22:04 被阅读0次

    摘要:Python,多进程

    多进程变量同步的场景和方法

    场景:在使用Python多进程并行时需要在进程间共享变量,这些共享的变量可以更好地控制和把握任务执行的情况,比如查看任务进度,提前停止任务等
    方法:在多线程中变量共享在主线程中定义变量,在每个子线程中使用global关键字拿到变量,再配合threading.RLock()在对变量操作时拿到和释放锁(acquirerelease)即可,但是在多进程中,变量是放在不同子进程的数据区中,每个进程都是独立的地址空间,所以用一般的方法是不能共享变量的,multiprocessing模块提供了ArrayManagerValue类来定义共享变量,能够实现进程间共享数字,字符串,列表,字典,实例对象的变量共享


    共享整数变量

    对于整数的多进程共享是常用的场景,比如使用多进程并行任务,需要记录执行日志记录任务进度,实例代码如下

    import multiprocessing
    from multiprocessing import Pool, Lock, Value
    
    from utils.logger_utils import logging_
    
    LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log"))
    lock = Lock()
    Counter = Value('i', 0)
    ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ent_name_predict.txt"), "r", encoding="utf8").readlines()]))
    TOTAL = len(ENT_LIST)
    
    def get_one_res(data):
        global TOTAL, lock, Counter
        res = {}
        try:
            ent_name = data
            res = get_feature(ent_name, PREDICT_DATE)
            res["updatedate"] = PREDICT_DATE
            res["uid"] = get_md5(formatted_ent(ent_name))
        except Exception as e:
            LOGGER.error(data + ":错误:" + e.args[0])
        finally:
            with lock:
                Counter.value += 1
            LOGGER.info("执行完成:(%d / %d) 进程号: %d --------------- %s", Counter.value, TOTAL, os.getpid(), data)
        return res
    
    
    if __name__ == '__main__':
        pool = Pool(int(get_string("process_num")))
        res = pool.map(get_one_res, ENT_LIST)
        LOGGER.info("全部执行完成,关闭进程池")
        pool.close()
        pool.join()
    

    运行查看执行日志

    2021-11-18 15:19:15 [predict_main] INFO [42] 执行完成:(1 / 1400) 进程号: 15 --------------- 深圳顺亚投资有限公司
    2021-11-18 15:19:16 [predict_main] INFO [42] 执行完成:(2 / 1400) 进程号: 25 --------------- 芜湖新扬投资合伙企业(有限合伙)
    2021-11-18 15:19:18 [predict_main] INFO [42] 执行完成:(3 / 1400) 进程号: 24 --------------- 保定隆瑞房地产开发有限公司
    2021-11-18 15:19:19 [predict_main] INFO [42] 执行完成:(4 / 1400) 进程号: 11 --------------- 云南俊发凯丰房地产开发有限公司
    

    在全局定义锁和计数器,Value('i', 0)代表定义的共享变量是int类型初始值是0,如果要定义double变量则使用Value('d', 0),相当于java里面的原子变量,在执行函数中调用with上下文在实行完任务后调用Counter.value += 1实现计数+1,最后在进程池中调用执行方法,每个并行的任务在执行完毕会调用锁进行计数器+1,同一时刻只有一个子进程拿到锁实现进程同步,如果不采用锁的方式,在日志中计数器会乱序,但是最终总的值相等


    共享布尔变量

    这种情况在全局中记录一个布尔变量,每次执行任务前拿到变量判断是否与预期一致,如果执行报错修改变量状态,多用于子进程中任务报错提前结束全部任务全部退出,代码如下

    from multiprocessing import Pool, Lock, Manager
    from ctypes import c_bool
    import os
    
    lock = Lock()
    ERROR = Manager().Value(c_bool, False)
    
    
    def run(fn):
        global tests_count, lock, ERROR
        if not ERROR.value:
            try:
                print('执行任务. PID: %d ' % (os.getpid()))
                1 / 0
            except Exception as e:
                with lock:
                    ERROR.value = True
        else:
            print("子进程报错,任务结束")
    
    
    if __name__ == "__main__":
        pool = Pool(10)
        # 80个任务,会运行run()80次,每次传入xrange数组一个元素
        pool.map(run, list(range(80)))
        pool.close()
        pool.join()
    

    查看执行输出

    执行任务. PID: 27374 
    子进程报错,任务结束
    子进程报错,任务结束
    子进程报错,任务结束
    ...
    Process finished with exit code 0
    

    初始化一个共享变量为布尔类型为False,每个进程在执行前先拿到共享变量判断是否为False,是则执行任务,否则直接跳过执行。初始化布尔变量使用Manager类实例化后调用Value方法,c_bool是Ctypes下的数据类型,相关类型如下

    另一种是在主进程中判断共享变量,调用map_async使得主进程不被子进程阻塞,主进程判断全局变量如果不符合预期直接退出,调用terminate终止线程池

    from multiprocessing import Pool, Lock, Manager, Value
    from ctypes import c_bool
    import os
    import time
    
    lock = Lock()
    ERROR = Manager().Value(c_bool, False)
    COUNTER = Value('i', 0)
    
    
    def run(fn):
        global tests_count, lock, ERROR
        try:
            time.sleep(2)
            1 / 0
        except:
            with lock:
                ERROR.value = True
        finally:
            with lock:
                COUNTER.value += 1
                print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))
    
    
    if __name__ == "__main__":
        pool = Pool(10)
        pool.map_async(run, list(range(80)))
        pool.close()
        print("主进程判断...")
        while COUNTER.value != len(list(range(80))):
            time.sleep(1)
            if ERROR.value:
                print("子进程报错,主进程提前退出")
                pool.terminate()
                break
        pool.join()
    

    输出如下,每隔1秒中检查全局变量ERROR,如果变为True主进程终止进程池

    主进程判断...
    执行任务(1 / 80). PID: 4168 
    执行任务(2 / 80). PID: 4169 
    执行任务(3 / 80). PID: 4177 
    执行任务(4 / 80). PID: 4171 
    执行任务(5 / 80). PID: 4173 
    执行任务(6 / 80). PID: 4182 
    执行任务(7 / 80). PID: 4183 
    执行任务(8 / 80). PID: 4174 
    执行任务(9 / 80). PID: 4179 
    执行任务(10 / 80). PID: 4181 
    子进程报错,主进程提前退出
    
    Process finished with exit code 0
    

    一个实用的例子是多进程找一个列表中符合要求第一个值,如果找到则退出多进程

    from multiprocessing import Pool, Lock, Manager, Value
    from ctypes import c_bool
    import os
    import time
    
    lock = Lock()
    FOUND = Manager().Value(c_bool, False)
    COUNTER = Value('i', 0)
    
    
    def run(fn):
        global tests_count, lock, ERROR
        try:
            time.sleep(2)
            res = fn + 1
            if res == 10:
                print("结果是:{}".format(fn))
                with lock:
                    FOUND.value = True
                return fn
        except Exception as e:
            print(e)
        finally:
            with lock:
                COUNTER.value += 1
                print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))
    
    
    if __name__ == "__main__":
        t1 = time.time()
        pool = Pool(10)
        pool.map_async(run, list(range(80)))
        pool.close()
        print("主进程判断...")
        while COUNTER.value != len(list(range(80))):
            time.sleep(1)
            if FOUND.value:
                print("已找到结果")
                pool.terminate()
                break
        pool.join()
        t2 = time.time()
        print(t2 - t1)
    

    共享字典和数组变量

    使用Manager近创建,Manager().dict()Manager().list(),测试代码如下

    from multiprocessing.pool import Pool
    from multiprocessing import Manager, Lock
    import time
    import datetime
    
    LOCK = Lock()
    DICT = Manager().dict()
    LIST = Manager().list()
    
    
    def job(ent):
        with LOCK:
            if len(LIST) < 5:
                time.sleep(1)
                LIST.append(ent)
            else:
                if len(LIST) and ent <= len(LIST) - 1:
                    LIST.pop(ent)
            print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
                  "LIST:{}".format(LIST))
    
    
    def job2(ent):
        if len(LIST) < 5:
            time.sleep(1)
            LIST.append(ent)
        else:
            if len(LIST) and ent <= len(LIST) - 1:
                LIST.pop(ent)
        print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
              "LIST:{}".format(LIST))
    
    
    if __name__ == '__main__':
        pool = Pool(10)
        pool.map(job2, list(range(10)) * 2)
        pool.close()
        pool.join()
    

    执行job输出如下,输出结果和单进程单线程的结果一致,按照顺序每个进程在锁外排队

    dt:2021-11-20 22:01:28 ent:0 LIST:[0]
    dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1]
    dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2]
    dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3]
    dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4]
    dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4]
    dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4]
    dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4]
    dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4]
    dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4]
    dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4]
    dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1]
    dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1]
    dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3]
    dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1]
    dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5]
    dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5]
    dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5]
    dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5]
    dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5]
    
    Process finished with exit code 0
    

    如果执行job2则不控制共享变量的同步则完全失控,报错pop index out of range,原因是在对变量操作时有其他进程也在操作得到意想不到的结果

    dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2]
    dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2]
    dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2]
    dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5]
    dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6]
    dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8]
    dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
    dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
    ...
        raise self._value
    IndexError: pop index out of range
    

    共享自定义对象

    相关文章

      网友评论

        本文标题:Python:多进程同步共享全局变量(锁,计数器,原子布尔)

        本文链接:https://www.haomeiwen.com/subject/floetrtx.html