美文网首页Fluent Python
使用 future 处理并发

使用 future 处理并发

作者: 一块大番薯 | 来源:发表于2018-03-20 10:33 被阅读25次

    依序下载

    # 从类似 https://flupy.org/data/flags/cn/cn.gif 中下载国旗
    import os
    import sys
    import time
    
    import requests
    
    POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
                'MX PH VN ET EG DE IR TR CD FR').split()
    
    BASE_URL = 'https://flupy.org/data/flags'
    
    DEST_DIR = 'downloads/'
    
    
    def save_flag(img, filename):
        path = os.path.join(DEST_DIR, filename)
        with open(path, 'wb') as fp:
            fp.write(img)
            
    
    def get_flag(cc):
        url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
        resp = requests.get(url)
        return resp.content
        
        
    def show_flag(cc):
        print(cc, end=' ')
        sys.stdout.flush()
        
        
    def download_many(cc_list):
        for cc in cc_list:
            imge = get_flag(cc)
            save_flag(imge, cc.lower() + '.gif')
            show_flag(cc)
            
        return len(cc_list)
        
    def main(download_many):
        t0 = time.time()
        count = download_many(POP20_CC)
        elapsed = time.time() - t0
        msg = '\n{} flags downloaded in {:.2f}s'
        print(msg.format(count, elapsed))
        
        
    if __name__ == '__main__':
        main(download_many)
    

    使用 concurrent.futures 模块下载

    from concurrent import futures
    
    from flags import save_flag, get_flag, show_flag, main
    
    MAX_WORKERS = 20
    
    
    def download_one(cc):
        img = get_flag(cc)
        save_flag(img, cc.lower() + '.gif')
        show_flag(cc)
    
    
    def download_many(cc_list):
        workers = min(MAX_WORKERS, len(cc_list))
        with futures.ThreadPoolExecutor(workers) as executor:
            executor.map(download_one, cc_list)
    
        return len(cc_list)
    
    
    if __name__ == '__main__':
        main(download_many)
    

    future 在那里

    def download_many(cc_list):
        cc_list = cc_list[:5]
        with futures.ThreadPoolExecutor(max_workers=3) as executor:
            to_do = []  # 创建并排定future
            for cc in cc_list:
                future = executor.submit(download_one, cc)
                msg = 'Schduled for {}:{}'
                print(msg.format(cc, future))
                to_do.append(future)
    
            results = []    # 获取future结果
            for future in futures.as_completed(to_do):
                res = future.result()
                msg = '{} result: {}'
                print(msg.format(future, res))
                results.append(res)
    
        return len(results)
    

    进程与线程、阻塞型IO和GIL

    • 等待网络响应属于阻塞型IO,应该用线程。
      而计算 hash 值,实现 RC4 算法属于 CPU 密集型处理,应该用进程。
    • ThreadPoolExecutor._init_ 方法需要 max_workers 参数,
      而 ProcessPoolEcecutor 的进程数默认值:os.cpu_count()
    • GIL(global interpreter lock)全局解释器锁。
      对于 IO 密集型程序,阻塞型 IO 函数会释放 GIL,然后运行另一个线程
      time.sleep() 也会释放 GIL

    Executor.map

    • executor.map 只能处理同一个可调用对象
      而 executor.submit 和 futures.as_completed 组合能够处理不同的可调用对象

    相关文章

      网友评论

        本文标题:使用 future 处理并发

        本文链接:https://www.haomeiwen.com/subject/dcazoxtx.html