美文网首页python爬虫与人工智能
python多线程编程之异步委托

python多线程编程之异步委托

作者: 桂圆汤 | 来源:发表于2017-10-28 04:53 被阅读0次

      多线程编程一向是难点,也容易出问题。之前c#中异步委托用的很爽,python中如何实现类似效果呢?


    异步流程图.png

      上面的流程图中,在接收数据之后,启动一个清洗数据的线程,然后不必等待清洗结果继续接收数据。同样,在清洗数据时,启动一个保存数据的线程,然后不必等待保存结果继续清洗数据。
      由于python中父线程结束时,子线程也会跟着结束。因此我这里把接收数据,清洗数据,保存数据分别放入三个不同的线程池中,这三个线程池都是主线程创建。这样可以避免接收数据的线程结束时,清洗数据线程和保存数据线程也跟着结束。
      示例代码如下

    import threading 
    lock = threading.Lock() 
    from concurrent.futures import ThreadPoolExecutor 
    import pymysql
    conn = pymysql.connect(**sqlDict)
    cursor = conn.cursor()
    
    sock_max_workers=64 
    sock_pool =ThreadPoolExecutor(max_workers=sock_max_workers) 
    sock_future = []
    chgdata_max_workers=64
    chgdata_pool = ThreadPoolExecutor(max_workers=chgdata_max_workers) 
    chgdata_future = [] 
    savedata_max_workers=128
    savedata_pool = ThreadPoolExecutor(max_workers=savedata_max_workers) 
    savedata_future = [] 
    
    def sock(参数): 
        接收数据的代码 
        chgdata_future.append(chgdata_pool.submit(chgdata,参数) )   ##异步委托chgdata 
        其它代码 
    
    def chgdata(参数): 
        清洗数据的代码 
        savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托savedata 
        其它代码 
    
    def savedata(参数): 
        准备保存数据 
        lock.acquire()   #加个互斥锁 
        cursor.execute(command,data)  #保存到数据库 
        lock.release() #释放锁 
        其它代码 
    
    if __name__ == '__main__': 
        准备工作
        sock_future = [sock_pool.submit(sock,p) for p in iplist] 
        for f in sock_future: 
            f.result() 
        for f in chgdata_future: 
            f.result() 
        for f in saveda'ta_future: 
            f.result() 
        conn.commit()   #视实际需求放在 savadata 中
        conn.close()
    

      这里解释一下。由于所有线程共用一个conn,而cursor.execute()是独占式的,因此需加锁互斥使用此连接,否则会出现错误提示:pymysql.err.InterfaceError: (0, '') ,关于这个错误提示,StackOverflow 的说法是因为conn已被关闭,这个说法不完整,准确的说法应该是:conn已被关闭或已被独占。
      多线程保存数据到数据库时要小心,以保存到mysql为例,一般有如下几个方法:
      1. 每个线程拥有独立的连接
      2. 所有线程共用一个连接,则需加锁互斥使用此连接 (上述代码使用这种方式)
      3. 所有线程共用一个连接池,需要考虑线程总数和连接池连接数上限的问题
      彩蛋,多线程网络请求时容易出现错误提示Max retries exceeded with url:

        requests.adapters.DEFAULT_RETRIES = 5 
        time.sleep(1)    #减低网络请求的频率。如果实在不愿意,请使用代理。
    

    相关文章

      网友评论

        本文标题:python多线程编程之异步委托

        本文链接:https://www.haomeiwen.com/subject/ovjjpxtx.html