美文网首页大数据 爬虫Python AI SqlPython小哥哥
Python 源码分析:queue 队列模块 !

Python 源码分析:queue 队列模块 !

作者: 14e61d025165 | 来源:发表于2019-05-18 15:16 被阅读0次

    起步

    queue 模块提供适用于多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,所以多个线程很轻松地使用同一个实例。

    源码分析Python学习交流群:1004391443,这里有资源共享,技术解答,还有小编从最基础的Python资料到项目实战的学习资料都有整理,希望能帮助你更了解python,学习python。

    先从初始化的函数来看:

    class Queue:
    def init(self, maxsize=0):
    # 设置队列的最大容量
    self.maxsize = maxsize
    self._init(maxsize)
    # 线程锁,互斥变量
    self.mutex = threading.Lock()
    # 由锁衍生出三个条件变量
    self.not_empty = threading.Condition(self.mutex)
    self.not_full = threading.Condition(self.mutex)
    self.all_tasks_done = threading.Condition(self.mutex)
    self.unfinished_tasks = 0
    def _init(self, maxsize):
    # 初始化底层数据结构
    self.queue = deque()
    从这初始化函数能得到哪些信息呢?首先,队列是可以设置其容量大小的,并且具体的底层存放元素的它使用了 collections.deque() 双端列表的数据结构,这使得能很方便的做先进先出操作。这里还特地抽象为_init 函数是为了方便其子类进行覆盖,允许子类使用其他结构来存放元素(比如优先队列使用了 list)。

    然后就是线程锁 self.mutex ,对于底层数据结构 self.queue 的操作都要先获得这把锁;再往下是三个条件变量,这三个 Condition 都以 self.mutex 作为参数,也就是说它们共用一把锁;从这可以知道诸如with self.mutex与 with self.not_empty 等都是互斥的。

    基于这些锁而做的一些简单的操作:

    class Queue:
    ...
    def qsize(self):
    # 返回队列中的元素数
    with self.mutex:
    return self._qsize()
    def empty(self):
    # 队列是否为空
    with self.mutex:
    return not self._qsize()
    def full(self):
    # 队列是否已满
    with self.mutex:
    return 0 < self.maxsize <= self._qsize()
    def _qsize(self):
    return len(self.queue)
    这个代码片段挺好理解的,无需分析。

    作为队列,主要得完成入队与出队的操作,首先是入队:

    class Queue:
    ...
    def put(self, item, block=True, timeout=None):
    with self.not_full: # 获取条件变量not_full
    if self.maxsize > 0:
    if not block:
    if self._qsize() >= self.maxsize:
    raise Full # 如果 block 是 False,并且队列已满,那么抛出 Full 异常
    elif timeout is None:
    while self._qsize() >= self.maxsize:
    self.not_full.wait() # 阻塞直到由剩余空间
    elif timeout < 0: # 不合格的参数值,抛出ValueError
    raise ValueError("'timeout' must be a non-negative number")
    else:
    endtime = time() + timeout # 计算等待的结束时间
    while self._qsize() >= self.maxsize:
    remaining = endtime - time()
    if remaining <= 0.0:
    raise Full # 等待期间一直没空间,抛出 Full 异常
    self.not_full.wait(remaining)
    self._put(item) # 往底层数据结构中加入一个元素
    self.unfinished_tasks += 1
    self.not_empty.notify()
    def _put(self, item):
    self.queue.append(item)
    尽管只有二十几行的代码,但这里的逻辑还是比较复杂的。它要处理超时与队列剩余空间不足的情况,具体几种情况如下:

    1、如果 block 是 False,忽略timeout参数

    若此时队列已满,则抛出 Full 异常;
    若此时队列未满,则立即把元素保存到底层数据结构中;
    2、如果 block 是 True

    若 timeout 是 None 时,那么put操作可能会阻塞,直到队列中有空闲的空间(默认);
    若 timeout 是非负数,则会阻塞相应时间直到队列中有剩余空间,在这个期间,如果队列中一直没有空间,抛出 Full 异常;
    处理好参数逻辑后,,将元素保存到底层数据结构中,并递增unfinished_tasks,同时通知 not_empty ,唤醒在其中等待数据的线程。

    出队操作:

    class Queue:
    ...
    def get(self, block=True, timeout=None):
    with self.not_empty:
    if not block:
    if not self._qsize():
    raise Empty
    elif timeout is None:
    while not self._qsize():
    self.not_empty.wait()
    elif timeout < 0:
    raise ValueError("'timeout' must be a non-negative number")
    else:
    endtime = time() + timeout
    while not self._qsize():
    remaining = endtime - time()
    if remaining <= 0.0:
    raise Empty
    self.not_empty.wait(remaining)
    item = self._get()
    self.not_full.notify()
    return item
    def _get(self):
    return self.queue.popleft()
    get() 操作是 put() 相反的操作,代码块也及其相似,get() 是从队列中移除最先插入的元素并将其返回。

    1、如果 block 是 False,忽略timeout参数

    若此时队列没有元素,则抛出 Empty 异常;
    若此时队列由元素,则立即把元素保存到底层数据结构中;
    2、如果 block 是 True

    若 timeout 是 None 时,那么get操作可能会阻塞,直到队列中有元素(默认);
    若 timeout 是非负数,则会阻塞相应时间直到队列中有元素,在这个期间,如果队列中一直没有元素,则抛出 Empty 异常;
    最后,通过 self.queue.popleft() 将最早放入队列的元素移除,并通知 not_full ,唤醒在其中等待数据的线程。

    这里有个值得注意的地方,在 put() 操作中递增了 self.unfinished_tasks ,而 get() 中却没有递减,这是为什么?

    这其实是为了留给用户一个消费元素的时间,get() 仅仅是获取元素,并不代表消费者线程处理的该元素,用户需要调用 task_done() 来通知队列该任务处理完成了:

    class Queue:
    ...
    def task_done(self):
    with self.all_tasks_done:
    unfinished = self.unfinished_tasks - 1
    if unfinished <= 0:
    if unfinished < 0: # 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常
    raise ValueError('task_done() called too many times')
    self.all_tasks_done.notify_all() # 当unfinished为0时,会通知all_tasks_done
    self.unfinished_tasks = unfinished
    def join(self):
    with self.all_tasks_done:
    while self.unfinished_tasks: # 如果有未完成的任务,将调用wait()方法等待
    self.all_tasks_done.wait()
    由于 task_done()使用方调用的,当 task_done() 次数大于 put() 次数时会抛出异常。

    task_done() 操作的作用是唤醒正在阻塞的 join() 操作。join() 方法会一直阻塞,直到队列中所有的元素都被取出,并被处理了(和线程的join方法类似)。也就是说 join()方法必须配合task_done() 来使用才行。

    LIFO 后进先出队列

    LifoQueue使用后进先出顺序,与栈结构相似:

    class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''
    def _init(self, maxsize):
    self.queue = []
    def _qsize(self):
    return len(self.queue)
    def _put(self, item):
    self.queue.append(item)
    def _get(self):
    return self.queue.pop()
    这就是 LifoQueue 全部代码了,这正是 Queue 设计很棒的一个原因,它将底层的数据操作抽象成四个操作函数,本身来处理线程安全的问题,使得其子类只需关注底层的操作。

    LifoQueue 底层数据结构改用 list 来存放,通过 self.queue.pop() 就能将 list 中最后一个元素移除,无需重置索引。

    PriorityQueue 优先队列

    from heapq import heappush, heappop
    class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).
    Entries are typically tuples of the form: (priority number, data).
    '''
    def _init(self, maxsize):
    self.queue = []
    def _qsize(self):
    return len(self.queue)
    def _put(self, item):
    heappush(self.queue, item)
    def _get(self):
    return heappop(self.queue)
    优先队列使用了 heapq 模块的结构,也就是最小堆的结构。优先队列更为常用,队列中项目的处理顺序需要基于这些项目的特征,一个简单的例子:

    import queue
    class A:
    def init(self, priority, value):
    self.priority = priority
    self.value = value
    def lt(self, other):
    return self.priority < other.priority
    q = queue.PriorityQueue()
    q.put(A(1, 'a'))
    q.put(A(0, 'b'))
    q.put(A(1, 'c'))
    print(q.get().value) # 'b'
    使用优先队列的时候,需要定义 lt 魔术方法,来定义它们之间如何比较大小。若元素的 priority 相同,依然使用先进先出的顺序。

    参考

    https://pymotw.com/3/queue/index.html

    相关文章

      网友评论

        本文标题:Python 源码分析:queue 队列模块 !

        本文链接:https://www.haomeiwen.com/subject/gnjtzqtx.html