美文网首页Python笔记
Python笔记之pool.join

Python笔记之pool.join

作者: Sugeei | 来源:发表于2019-03-07 16:15 被阅读0次

    https://docs.python.org/2/library/multiprocessing.html

    pool.join

    Block until all items in the queue have been gotten and processed.

    The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

    有一批日期相关的计算任务。需要按天计算,在每天的计算中,又需要计算n(n>10000)次。
    看起来是这样:

    datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表
    
    # tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
    for ticker in tickers['ticker_symbol'].values: 
            # 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
        predict = loader.get_org_predict(ticker)
        score = loader.get_tprice_and_score(ticker)
            
            # 下面步骤遍历datelist
        for i in range(len(datelist)):
              do_calculation # 针对每个对象在某一天的计算逻辑, 比较耗时,需要1秒算出结果。
    

    使用pool并行

    datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表
    
    # 声明pool
    pool = Pool(processes=(int(multiprocessing.cpu_count() * 0.7) + 1))
    
    # tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
    for ticker in tickers['ticker_symbol'].values: 
            # 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
        predict = loader.get_org_predict(ticker)
        score = loader.get_tprice_and_score(ticker)
            
            # 下面步骤遍历datelist
        for i in range(len(datelist)):
            result = pool.apply_async(do_calculation)
            #do_calculation # 针对每个对象在某一天的计算逻辑, 比较耗时,需要1秒算出结果。
    

    pool会将所有待处理的任务都丢进自己的等待队列中。
    如果需要在每个ticker的数据都算完后,获取返回的结果,假设do_calculation返回一个值,将这一批值(对某个ticker, datelist中每个日期都对应一个结果)收集起来, 加入其它的操作, 比如写入数据库或者存到本地文件。那么需要在每个外层for循环体的结尾处添加join用于等待当前pool队列中的task执行完成。

    datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表
    
    # 声明pool
    pool = Pool(processes=(int(multiprocessing.cpu_count() * 0.7) + 1))
    
    # tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
    for ticker in tickers['ticker_symbol'].values: 
            # 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
        predict = loader.get_org_predict(ticker)
        score = loader.get_tprice_and_score(ticker)
        result = []
            # 下面步骤遍历datelist
        for i in range(len(datelist)):
            result = pool.apply_async(do_calculation)
    
        pool.join # Block until all items in the queue have been gotten and processed.
    
    pool.close
    

    相关文章

      网友评论

        本文标题:Python笔记之pool.join

        本文链接:https://www.haomeiwen.com/subject/rgncpqtx.html