美文网首页Python
python线程池ThreadPoolExecutor.subm

python线程池ThreadPoolExecutor.subm

作者: 超神雷鸣 | 来源:发表于2021-06-07 08:57 被阅读0次
异世界蕾姆_1.png

ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

class ThreadPoolExecutor(_base.Executor):

    # Used to assign unique thread names when thread_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
    submit.__doc__ = _base.Executor.submit.__doc__

关于concurrent.futures模块下的ThreadPoolExecutor类
在使用submit的时候,如果参数传进去的是生成器对象,在某些情况下,生成器对象会被消耗掉一部分或者是全部的数据

具体如下demo展示:

#!/usr/bin/env Python
# -- coding: utf-8 --

"""
@version: v1.0
@author: huangyc
@file: test.py
@Description: 
@time: 2021/6/3 11:19
"""
from concurrent import futures
from itertools import groupby, count, tee
from typing import Iterable


def iter_slice_tool(iterator: Iterable, batch_size: int = 5):
    """生成器 访问工具方法 支持每次取 n个元素"""
    yield from groupby(iterator, key=lambda _, c=count(): next(c) // batch_size)


def gen_datas():
    yield from range(30)


def single_task(samples):
    print(list(samples))


if __name__ == '__main__':
    num = 4
    tasks = []
    executor = futures.ThreadPoolExecutor(max_workers=num)
    batch_size = 6

    # 尝试一
    """
        数据会无缘无故就少了,可以调数据量和batch_size   会看到不一样的缺失结果
        [0, 1, 2, 3, 4, 5]
        [6, 7, 8, 9, 10, 11]
        [12, 13, 14, 15, 16, 17]
        [18, 19, 20, 21, 22, 23]
        [29]
    """
    for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
        # sample_copy, sample = tee(sample, 2)  这个加不加 效果一样
        sample_copy, sample = tee(sample, 2)
        task = executor.submit(single_task, samples=sample)
        tasks.append(task)
    [future.result() for future in futures.as_completed(tasks)]
    print()

    # 尝试二
    """
        [0, 1, 2, 3, 4, 5]
        [6, 7, 8, 9, 10, 11]
        [12, 13, 14, 15, 16, 17]
        [18, 19, 20, 21, 22, 23]
        [24, 25, 26, 27, 28, 29]
    """
    for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
        sample = list(sample)
        task = executor.submit(single_task, samples=sample)
        tasks.append(task)
    [future.result() for future in futures.as_completed(tasks)]
    print()

    # 尝试三
    """
        [0, 1, 2, 3, 4, 5]
        [6, 7, 8, 9, 10, 11]
        [12, 13, 14, 15, 16, 17]
        [18, 19, 20, 21, 22, 23]
        [24, 25, 26, 27, 28, 29]
    """
    for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
        sample_copy, sample = tee(sample, 2)
        list(sample_copy)
        task = executor.submit(single_task, samples=sample)
        tasks.append(task)
    [future.result() for future in futures.as_completed(tasks)]

以上示例中,尝试二部分是正常且保证是没有问题
而尝试一则会在submit的时候被消耗掉一部分的数据
尝试三这里先利用tee,复制出两个副本,并且调用了其中一个转list,另一个丢给submit方法,这种情况下,数据不会产生丢失

sample_copy, sample = tee(sample, 2)
list(sample_copy)

两个问题

问题一:生成器对象为什么会在submit的时候,丢失了部分数据?
问题二:尝试三这里复制了副本,对其中一个转list,就不会丢失数据,不转list还是会丢失数据,又是什么原理?

不知道有没知情人士可以帮忙解答下,不胜感激、

相关文章

网友评论

    本文标题:python线程池ThreadPoolExecutor.subm

    本文链接:https://www.haomeiwen.com/subject/jeezsltx.html