美文网首页Python
python线程池ThreadPoolExecutor.subm

python线程池ThreadPoolExecutor.subm

作者: 超神雷鸣 | 来源:发表于2021-06-07 08:57 被阅读0次
    异世界蕾姆_1.png

    ThreadPoolExecutorExecutor 的子类,它使用线程池来异步执行调用。

    class ThreadPoolExecutor(_base.Executor):
    
        # Used to assign unique thread names when thread_name_prefix is not supplied.
        _counter = itertools.count().__next__
    
        def submit(self, fn, *args, **kwargs):
            with self._shutdown_lock:
                if self._shutdown:
                    raise RuntimeError('cannot schedule new futures after shutdown')
    
                f = _base.Future()
                w = _WorkItem(f, fn, args, kwargs)
    
                self._work_queue.put(w)
                self._adjust_thread_count()
                return f
        submit.__doc__ = _base.Executor.submit.__doc__
    

    关于concurrent.futures模块下的ThreadPoolExecutor类
    在使用submit的时候,如果参数传进去的是生成器对象,在某些情况下,生成器对象会被消耗掉一部分或者是全部的数据

    具体如下demo展示:

    #!/usr/bin/env Python
    # -- coding: utf-8 --
    
    """
    @version: v1.0
    @author: huangyc
    @file: test.py
    @Description: 
    @time: 2021/6/3 11:19
    """
    from concurrent import futures
    from itertools import groupby, count, tee
    from typing import Iterable
    
    
    def iter_slice_tool(iterator: Iterable, batch_size: int = 5):
        """生成器 访问工具方法 支持每次取 n个元素"""
        yield from groupby(iterator, key=lambda _, c=count(): next(c) // batch_size)
    
    
    def gen_datas():
        yield from range(30)
    
    
    def single_task(samples):
        print(list(samples))
    
    
    if __name__ == '__main__':
        num = 4
        tasks = []
        executor = futures.ThreadPoolExecutor(max_workers=num)
        batch_size = 6
    
        # 尝试一
        """
            数据会无缘无故就少了,可以调数据量和batch_size   会看到不一样的缺失结果
            [0, 1, 2, 3, 4, 5]
            [6, 7, 8, 9, 10, 11]
            [12, 13, 14, 15, 16, 17]
            [18, 19, 20, 21, 22, 23]
            [29]
        """
        for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
            # sample_copy, sample = tee(sample, 2)  这个加不加 效果一样
            sample_copy, sample = tee(sample, 2)
            task = executor.submit(single_task, samples=sample)
            tasks.append(task)
        [future.result() for future in futures.as_completed(tasks)]
        print()
    
        # 尝试二
        """
            [0, 1, 2, 3, 4, 5]
            [6, 7, 8, 9, 10, 11]
            [12, 13, 14, 15, 16, 17]
            [18, 19, 20, 21, 22, 23]
            [24, 25, 26, 27, 28, 29]
        """
        for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
            sample = list(sample)
            task = executor.submit(single_task, samples=sample)
            tasks.append(task)
        [future.result() for future in futures.as_completed(tasks)]
        print()
    
        # 尝试三
        """
            [0, 1, 2, 3, 4, 5]
            [6, 7, 8, 9, 10, 11]
            [12, 13, 14, 15, 16, 17]
            [18, 19, 20, 21, 22, 23]
            [24, 25, 26, 27, 28, 29]
        """
        for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
            sample_copy, sample = tee(sample, 2)
            list(sample_copy)
            task = executor.submit(single_task, samples=sample)
            tasks.append(task)
        [future.result() for future in futures.as_completed(tasks)]
    
    

    以上示例中,尝试二部分是正常且保证是没有问题
    而尝试一则会在submit的时候被消耗掉一部分的数据
    尝试三这里先利用tee,复制出两个副本,并且调用了其中一个转list,另一个丢给submit方法,这种情况下,数据不会产生丢失

    sample_copy, sample = tee(sample, 2)
    list(sample_copy)
    

    两个问题

    问题一:生成器对象为什么会在submit的时候,丢失了部分数据?
    问题二:尝试三这里复制了副本,对其中一个转list,就不会丢失数据,不转list还是会丢失数据,又是什么原理?

    不知道有没知情人士可以帮忙解答下,不胜感激、

    相关文章

      网友评论

        本文标题:python线程池ThreadPoolExecutor.subm

        本文链接:https://www.haomeiwen.com/subject/jeezsltx.html