美文网首页
编写高质量的python代码(4): 使用Queue使多线程编程

编写高质量的python代码(4): 使用Queue使多线程编程

作者: DraculaWong | 来源:发表于2016-10-12 16:03 被阅读0次

          曾经有这么一个说法,程序中存在3种类型的bug:你的bug,我的bug和多线程。这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的事情。线程间的同步和互斥,线程间数据的共享等这些都是涉及线程安全方面要考虑的问题。纵然python中提供了众多的同步和互斥机制,如mutex,condition,event等,但同步和互斥本身就不是一个容易的话题,稍有不慎就会陷入死锁状态或者威胁线程安全。我们来看一个经典的多线程同步问题:生产者消费者模型。如果用python来实现,你会怎么写?大概思路是这样:分别创建消费者和生产者线程,生产者往队列里面放产品,消费者从队列里面取出产品,创建一个线程锁以保证线程间操作的互斥性。当队列满足的时候消费者进入等待状态,当队列为空的时候生产者进入等待状态。我们来看一个具体的python实现:

    # -*- coding: utf-8 -*-
    import Queue
    import threading
    import random
    
    
    writelock = threading.Lock()
    
    class Producer(threading.Thread):
        def __init__(self, q, con, name):
            super(Producer, self).__init__()
            self.q = q
            self.name = name
            self.con = con
            print "Producer " + self.name + " Started"
        def run(self):
            while 1:
                global writelock
                self.con.acquire()   # 获取锁对象
                if self.q.full():
                    with writelock:
                        print 'Queue is full, producer wait!'
                    self.con.wait()  # 等待资源
                else:
                    value = random.randint(0, 10)
                    with writelock:
                        print self.name + " put value" + self.name + ":" + str(value)+ "into queue"
                self.q.put((self.name + ":" + str(value)))
                self.con.notify()
            self.con.release()
    
    
    class Consumer(threading.Thread):
        def __init__(self, q, con, name):
            super(Consumer, self).__init__()
            self.q = q
            self.con = con
            self.name = name
            print "Consumer "+self.name+" started\n"
    
        def run(self):
            while 1:
                global writelock
                self.con.acquire()
                if self.q.empty():
                    with writelock:
                        print 'queue is empty, consumer wait!'
                    self.con.wait()
                else:
                    value = self.q.get()
                    with writelock:
                        print self.name + "get value" + value + " from queue"
                    self.con.notify()
                self.con.release()
    
    if __name__ == "__main__":
        q = Queue.Queue(10)
        con = threading.Condition()
        p = Producer(q, con, "P1")
        p.start()
        p1 = Producer(q, con, "P2")
        p1.start()
        c1 = Consumer(q, con, "C1")
        c1.start()
    

          上面的程序实现有什么问题吗?回答这个问题之前,我们先来了解一下Queue模块的基本知识。Python中的Queue模块提供了3种队列:

    • Queue.Queue(maxsize): 先进先出,maxsize为队列大小,其值为非正数非时候为无限循环队列
    • Queue.LifoQueue(maxsize): 后进先出,相当于栈
    • Queue.PriorityQueue(maxsize): 优先级队列。
      这三种队列支持以下方法:
    • Queue.qsize(): 返回近似的队列大小。注意,这里之所以加“近似”二字,是因为该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效
    • Queue.empty(): 队列为空的时候返回True, 否则返回False
    • Queue.full(): 当设定了队列大小的时候,如果队列满则返回True,否则返回False
    • Queue.put(item[,block[, timeout]]): 往队列里添加元素item,block设置为False的时候,如果对列满则抛出Full异常。如果block设置为True,timeout为None的时候则会一直等待到有空位置,否则会根据timeout的设定超时后抛出full异常
    • Queue.put_nowait(item): 等价于put(item, False).block设置为False的时候,如果对列空咋抛出Empty异常。如果block设置为True,timeout为None的时候则会一直等待到有元素可用,否则会根据timeout的设定超时后抛出empty异常
    • Queue.get([block[,timeout]]): 从队列中删除元素并返回该元素的值
    • Queue.get_nowait(): 等价于get(False)
    • Queue.task_done(): 发送信号表明入队任务已经完成,经常在消费者线程中用到
    • Queue.join(): 阻塞直至对列中所有元素处理完毕
            Queue模块实现了多个生产者消费者的对列,当线程之间需要信息安全的交换的时候特别的有用,因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支持,踏实线程安全的。需要注意的是Queue模块中的对列和collections.dequeue所表示对额对列并不一样,前者主要用于不同线程之间的通信,它内部实现了现成的锁机制;而后者主要是数据结构上的概念,因此支持in方法
            再回过头来看看前面的例子,程序的实现有什么问题呢?答案很明显,作用于queue操作的条件变量完全是不需要的,因为Queue本身能够保证线程安全,因此不需要额外的同步机制。下面是一个多线程下载的例子
    # -*- coding: utf-8 -*-
    import os
    import Queue
    import threading
    import urllib2
    
    class DownloadThread(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            while True:
                url = self.queue.get()
                print self.name + "begin download"+url +"..."
                self.download_file(url)
                self.queue.task_done()
                print self.name + " download completed!!!"
        def download_file(self, url):
            urlhandler = urllib2.urlopen(url)
            fname = os.path.basename(url) + ".html"
            with open(fname, 'wb') as f:
                while True:
                    chunk = urlhandler.read(1024)
                    if not chunk:
                        break
                    f.write(chunk)
    
    if __name__ == "__main__":
        urls = [ "http://www.baidu.com", "http://www.cnblogs.com/chaosimple/p/4153083.html",
    "http://shop.dspread.com/weixin/ksher/check_shop?page_no=1&page_count=10"]
        queue = Queue.Queue()
        for i in range(5):
            t = DownloadThread(queue)
            t.setDaemon(True)
            t.start()
        for url in urls:
            queue.put(url)
        queue.join()
    

    相关文章

      网友评论

          本文标题:编写高质量的python代码(4): 使用Queue使多线程编程

          本文链接:https://www.haomeiwen.com/subject/dthiyttx.html