def do_work_list_slice_one(func, work_size, lists, *others):
with ProcessPoolExecutor(max_workers=work_size) as pool:
futures = [pool.submit(do_func, func, *para) for para in lists]
result_count = Counter([future.result() for future in futures])
result_dict = dict(result_count)
logger.info(result_dict)
logger.info("all finished!")
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from tools.logger_config import logger
def do_func(func, *paras):
try:
return func(*paras)
except Exception as e:
logger.exception(f"do_func error {paras}: ", e)
return "fail"
def do_work_ntile(func, work_size, ntile):
"""
Args:
func:
work_size:
ntile:
Returns:
"""
tasks = []
pool = multiprocessing.Pool(work_size)
for tile in ntile:
min_val = tile[0]
max_val = tile[1]
tasks.append(pool.apply_async(do_func, (func, min_val, max_val)))
pool.close()
pool.join()
result_count = Counter([task.get() for task in tasks])
result_dict = dict(result_count)
logger.info(result_dict)
logger.info("all finished!")
def do_work_list_slice(func, work_size, step, lists, *others):
"""
:param func:
:param work_size:
:param step:
:param lists:
:return:
"""
pool = multiprocessing.Pool(work_size)
tasks = []
start = 0
end = step
len_list = len(lists)
while True:
tasks.append(pool.apply_async(do_func, (func, lists, start, end, *others)))
start = end
end += step
if start > len_list:
break
pool.close()
pool.join()
result_count = Counter([task.get() for task in tasks])
result_dict = dict(result_count)
logger.info(result_dict)
logger.info("all finished!")
def do_work_offset(func, work_size, step, min_num, max_num):
"""
:param func:
:param work_size:
:param step:
:param min_num:
:param max_num:
:return:
"""
tasks = []
pool = multiprocessing.Pool(work_size)
offset = min_num
while True:
if offset > max_num:
break
tasks.append(pool.apply_async(do_func, (func, step, offset)))
offset += step
pool.close()
pool.join()
result_count = Counter([task.get() for task in tasks])
result_dict = dict(result_count)
logger.info(result_dict)
logger.info("all finished!")
# if __name__ == '__main__':
# do_work_concurrent(do, 1, 1, 0, 10)
# do_work_dedump(do, 1, 1, [1, 2, 3, 4, 5])
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed
start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
futures = [ pool.submit(gcd, pair) for pair in numbers]
for future in futures:
print '执行中:%s, 已完成:%s' % (future.running(), future.done())
print '#### 分界线 ####'
for future in as_completed(futures, timeout=2):
print '执行中:%s, 已完成:%s' % (future.running(), future.done())
end = time.time()
print 'Took %.3f seconds.' % (end - start)
===========================================================================
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
start = time.time()
futures = list()
with ProcessPoolExecutor(max_workers=2) as pool:
for pair in numbers:
future = pool.submit(gcd, pair)
futures.append(future)
print 'results: %s' % [future.result() for future in futures]
end = time.time()
print 'Took %.3f seconds.' % (end - start)
===========================================================================
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
futures = [ pool.submit(gcd, pair) for pair in numbers]
for future in futures:
print '执行中:%s, 已完成:%s' % (future.running(), future.done())
print '#### 分界线 ####'
done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED)
for d in done:
print '执行中:%s, 已完成:%s' % (d.running(), d.done())
print d.result()
end = time.time()
print 'Took %.3f seconds.' % (end - start)
网友评论