美文网首页python基础
队列 -- 线程安全的FIFO实现

队列 -- 线程安全的FIFO实现

作者: 英武 | 来源:发表于2017-05-30 22:46 被阅读111次

    queue — Thread-Safe FIFO Implementation

    队列 -- 线程安全的FIFO实现

    Purpose: Provides a thread-safe FIFO implementation

    目的:提供一个线程安全的FIFO实现

    The queue module provides a first-in, first-out (FIFO) data structure suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so many threads can work with the same Queue instance safely and easily. The size of a Queue (the number of elements it contains) may be restricted to throttle memory usage or processing.
    queue模块提供了一个适用于多线程编程的先进先出(FIFO)的数据结构。该模块可以用于在生产者和消费者之间线程安全的传递消息或者其他数据。调用者会自动创建锁, 多个线程能够与同一个queue实例安全并且容易的共同协作。Queue的大小(所包含的元素个数)可能收到所使用或者处理的内存限制。

    Note:
    注意:
    This discussion assumes you already understand the general nature of a queue. If you do not, you may want to read some of the references before continuing.
    这个讨论假定读者已经理解queue的一般本质。如果还没有理解,就在继续本文后续内容之前需要阅读一些相关的参考文档。

    Basic FIFO Queue

    基本的 FIFO 队列

    The Queue class implements a basic first-in, first-out container. Elements are added to one “end” of the sequence using put(), and removed from the other end using get().
    Queue类实现一个基本的先进先出容器。通过put()函数将元素加入序列的一端,然后通过get()函数从序列的另一端移出。

    queue_fifo.py

    import queue
    
    q = queue.Queue()
    
    for i in range(5):
        q.put(i)
    
    while not q.empty():
        print(q.get(), end=' ')
    print()
    

    This example uses a single thread to illustrate that elements are removed from the queue in the same order in which they are inserted.
    本示例使用了一个单线程来展示元素从queue中移除,以和它们插入相同的顺序。

    $ python3 queue_fifo.py

    0 1 2 3 4
    LIFO Queue

    In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure).
    与Queue的标准FIFO实现相反,LifoQueue使用后进显出的规则(通常与栈这个数据结构关联)。

    queue_lifo.py

    import queue
    
    q = queue.LifoQueue()
    
    for i in range(5):
        q.put(i)
    
    while not q.empty():
        print(q.get(), end=' ')
    print()
    

    The item most recently put into the queue is removed by get.
    放入queue中的元素将通过get函数移除。

    $ python3 queue_lifo.py

    4 3 2 1 0

    Priority Queue

    优先队列

    Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing that a developer wants to print. PriorityQueue uses the sort order of the contents of the queue to decide which item to retrieve.
    有时候,队列中元素的处理顺序需要基于元素的特性,而不是元素创建或者加入队列的顺序。例如,人事部门打印工资单的任务可能按照优先级,而不是程序员想要打印的顺序。优先队列将对于队列的内容进行排序,然后再决定检索那个元素。

    queue_priority.py

    import functools
    import queue
    import threading
    
    @functools.total_ordering
    class Job:
    
        def __init__(self, priority, description):
            self.priority = priority
            self.description = description
            print('New job:', description)
            return
    
        def __eq__(self, other):
            try:
                return self.priority == other.priority
            except AttributeError:
                return NotImplemented
    
        def __lt__(self, other):
            try:
                return self.priority < other.priority
            except AttributeError:
                return NotImplemented
    
    
    q = queue.PriorityQueue()
    
    q.put(Job(3, 'Mid-level job'))
    q.put(Job(10, 'Low-level job'))
    q.put(Job(1, 'Important job'))
    
    
    def process_job(q):
        while True:
            next_job = q.get()
            print('Processing job:', next_job.description)
            q.task_done()
    
    workers = [
        threading.Thread(target=process_job, args=(q,)),
        threading.Thread(target=process_job, args=(q,)),
    ]
    for w in workers:
        w.setDaemon(True)
        w.start()
    
    q.join()
    

    This example has multiple threads consuming the jobs, which are processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.
    本示例使用了多线程来执行任务,在调用get()函数时,会基于队列中元素的优先级处理相应的元素。队列中元素的处理顺序,消费线程的运行基于线程上下文切换。

    $ python3 queue_priority.py

    New job: Mid-level job
    New job: Low-level job
    New job: Important job
    Processing job: Important job
    Processing job: Mid-level job
    Processing job: Low-level job
    Building a Threaded Podcast Client

    The source code for the podcasting client in this section demonstrates how to use the Queue class with multiple threads. The program reads one or more RSS feeds, queues up the enclosures for the five most recent episodes from each feed to be downloaded, and processes several downloads in parallel using threads. It does not have enough error handling for production use, but the skeleton implementation illustrates the use of the queue module.
    本节中podcasting客户端的源代码展示了在多线程环境下使用队列类。程序读取了一个或者多个RSS feed,将每个feed中需要下载的五个最常见的剧集放入队列,然后使用线程并行处理需要下载的多个队列。在生产环境下使用时不会有太多的错误处理,但是所执行的架构很清晰的表现了queue模块的使用。

    First, some operating parameters are established. Usually, these would come from user inputs (e.g., preferences or a database). The example uses hard-coded values for the number of threads and list of URLs to fetch.
    首先,需要确认一些操作参数。通常情况下,这些参数都是来源于用户输入(例如:优先级或者数据库)。以下示例对于线程的数量和需要抓取的URL的列表使用了硬编码值。

    fetch_podcasts.py
    from queue import Queue
    import threading
    import time
    import urllib
    from urllib.parse import urlparse
    
    import feedparser
    
    # Set up some global variables
    num_fetch_threads = 2
    enclosure_queue = Queue()
    
    # A real app wouldn't use hard-coded data...
    feed_urls = [
        'http://talkpython.fm/episodes/rss',
    ]
    
    
    def message(s):
        print('{}: {}'.format(threading.current_thread().name, s))
    The function download_enclosures() runs in the worker thread and processes the downloads using urllib.
    
    def download_enclosures(q):
        """This is the worker thread function.
        It processes items in the queue one after
        another.  These daemon threads go into an
        infinite loop, and exit only when
        the main thread ends.
        """
        while True:
            message('looking for the next enclosure')
            url = q.get()
            filename = url.rpartition('/')[-1]
            message('downloading {}'.format(filename))
            response = urllib.request.urlopen(url)
            data = response.read()
            # Save the downloaded file to the current directory
            message('writing to {}'.format(filename))
            with open(filename, 'wb') as outfile:
                outfile.write(data)
            q.task_done()
    

    Once the target function for the threads is defined, the worker threads can be started. When download_enclosures() processes the statement url = q.get(), it blocks and waits until the queue has something to return. That means it is safe to start the threads before there is anything in the queue.
    一旦线程的目标函数被定义,worker线程就可以开始工作。当download_enclosures()处理语句url = q.get()时,他就阻塞并且一直等待,直到queue需要返回,这就意味着在队列中有东西时,启动线程总是安全的。

    # Set up some threads to fetch the enclosures
    for i in range(num_fetch_threads):
        worker = threading.Thread(
            target=download_enclosures,
            args=(enclosure_queue,),
            name='worker-{}'.format(i),
        )
        worker.setDaemon(True)
        worker.start()
    

    The next step is to retrieve the feed contents using the feedparser module and enqueue the URLs of the enclosures. As soon as the first URL is added to the queue, one of the worker threads picks it up and starts downloading it. The loop continues to add items until the feed is exhausted, and the worker threads take turns dequeuing URLs to download them.
    下一步是使用feedparser模块,以及检索feed的内容,以及入队URL的集合。一旦第一个URL添加到队列,就有一个worker线程选中它,然后开始下载它。循环将继续添加元素,直到耗尽全部的feed,worker线程将轮流的将URL出队,然后下载它们。

    # Download the feed(s) and put the enclosure URLs into
    # the queue.
    for url in feed_urls:
        response = feedparser.parse(url, agent='fetch_podcasts.py')
        for entry in response['entries'][:5]:
            for enclosure in entry.get('enclosures', []):
                parsed_url = urlparse(enclosure['url'])
                message('queuing {}'.format(
                    parsed_url.path.rpartition('/')[-1]))
                enclosure_queue.put(enclosure['url'])
    

    The only thing left to do is wait for the queue to empty out again, using join().
    现在唯一要做的事情就是等待队列再次变为空,使用join()。

    # Now wait for the queue to be empty, indicating that we have
    # processed all of the downloads.
    message('*** main thread waiting')
    enclosure_queue.join()
    message('*** done')
    Running the sample script produces output similar to the following.
    

    $ python3 fetch_podcasts.py

    worker-0: looking for the next enclosure
    worker-1: looking for the next enclosure
    MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
    MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
    MainThread: queuing openstack-cloud-computing-built-on-python.mp3
    MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
    MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
    MainThread: *** main thread waiting
    worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
    worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
    worker-0: looking for the next enclosure
    worker-0: downloading openstack-cloud-computing-built-on-python.mp3
    worker-1: looking for the next enclosure
    worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
    worker-0: looking for the next enclosure
    worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
    worker-1: looking for the next enclosure
    worker-0: looking for the next enclosure
    MainThread: *** done
    The actual output will depend on the contents of the RSS feed used

    相关文章

      网友评论

        本文标题: 队列 -- 线程安全的FIFO实现

        本文链接:https://www.haomeiwen.com/subject/aaqdfxtx.html