曾经有这么一个说法,程序中存在3种类型的bug:你的bug,我的bug和多线程。这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的事情。线程间的同步和互斥,线程间数据的共享等这些都是涉及线程安全方面要考虑的问题。纵然python中提供了众多的同步和互斥机制,如mutex,condition,event等,但同步和互斥本身就不是一个容易的话题,稍有不慎就会陷入死锁状态或者威胁线程安全。我们来看一个经典的多线程同步问题:生产者消费者模型。如果用python来实现,你会怎么写?大概思路是这样:分别创建消费者和生产者线程,生产者往队列里面放产品,消费者从队列里面取出产品,创建一个线程锁以保证线程间操作的互斥性。当队列满足的时候消费者进入等待状态,当队列为空的时候生产者进入等待状态。我们来看一个具体的python实现:
# -*- coding: utf-8 -*-
import Queue
import threading
import random
writelock = threading.Lock()
class Producer(threading.Thread):
def __init__(self, q, con, name):
super(Producer, self).__init__()
self.q = q
self.name = name
self.con = con
print "Producer " + self.name + " Started"
def run(self):
while 1:
global writelock
self.con.acquire() # 获取锁对象
if self.q.full():
with writelock:
print 'Queue is full, producer wait!'
self.con.wait() # 等待资源
else:
value = random.randint(0, 10)
with writelock:
print self.name + " put value" + self.name + ":" + str(value)+ "into queue"
self.q.put((self.name + ":" + str(value)))
self.con.notify()
self.con.release()
class Consumer(threading.Thread):
def __init__(self, q, con, name):
super(Consumer, self).__init__()
self.q = q
self.con = con
self.name = name
print "Consumer "+self.name+" started\n"
def run(self):
while 1:
global writelock
self.con.acquire()
if self.q.empty():
with writelock:
print 'queue is empty, consumer wait!'
self.con.wait()
else:
value = self.q.get()
with writelock:
print self.name + "get value" + value + " from queue"
self.con.notify()
self.con.release()
if __name__ == "__main__":
q = Queue.Queue(10)
con = threading.Condition()
p = Producer(q, con, "P1")
p.start()
p1 = Producer(q, con, "P2")
p1.start()
c1 = Consumer(q, con, "C1")
c1.start()
上面的程序实现有什么问题吗?回答这个问题之前,我们先来了解一下Queue模块的基本知识。Python中的Queue模块提供了3种队列:
- Queue.Queue(maxsize): 先进先出,maxsize为队列大小,其值为非正数非时候为无限循环队列
- Queue.LifoQueue(maxsize): 后进先出,相当于栈
- Queue.PriorityQueue(maxsize): 优先级队列。
这三种队列支持以下方法: - Queue.qsize(): 返回近似的队列大小。注意,这里之所以加“近似”二字,是因为该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效
- Queue.empty(): 队列为空的时候返回True, 否则返回False
- Queue.full(): 当设定了队列大小的时候,如果队列满则返回True,否则返回False
- Queue.put(item[,block[, timeout]]): 往队列里添加元素item,block设置为False的时候,如果对列满则抛出Full异常。如果block设置为True,timeout为None的时候则会一直等待到有空位置,否则会根据timeout的设定超时后抛出full异常
- Queue.put_nowait(item): 等价于put(item, False).block设置为False的时候,如果对列空咋抛出Empty异常。如果block设置为True,timeout为None的时候则会一直等待到有元素可用,否则会根据timeout的设定超时后抛出empty异常
- Queue.get([block[,timeout]]): 从队列中删除元素并返回该元素的值
- Queue.get_nowait(): 等价于get(False)
- Queue.task_done(): 发送信号表明入队任务已经完成,经常在消费者线程中用到
- Queue.join(): 阻塞直至对列中所有元素处理完毕
Queue模块实现了多个生产者消费者的对列,当线程之间需要信息安全的交换的时候特别的有用,因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支持,踏实线程安全的。需要注意的是Queue模块中的对列和collections.dequeue所表示对额对列并不一样,前者主要用于不同线程之间的通信,它内部实现了现成的锁机制;而后者主要是数据结构上的概念,因此支持in方法
再回过头来看看前面的例子,程序的实现有什么问题呢?答案很明显,作用于queue操作的条件变量完全是不需要的,因为Queue本身能够保证线程安全,因此不需要额外的同步机制。下面是一个多线程下载的例子
# -*- coding: utf-8 -*-
import os
import Queue
import threading
import urllib2
class DownloadThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
url = self.queue.get()
print self.name + "begin download"+url +"..."
self.download_file(url)
self.queue.task_done()
print self.name + " download completed!!!"
def download_file(self, url):
urlhandler = urllib2.urlopen(url)
fname = os.path.basename(url) + ".html"
with open(fname, 'wb') as f:
while True:
chunk = urlhandler.read(1024)
if not chunk:
break
f.write(chunk)
if __name__ == "__main__":
urls = [ "http://www.baidu.com", "http://www.cnblogs.com/chaosimple/p/4153083.html",
"http://shop.dspread.com/weixin/ksher/check_shop?page_no=1&page_count=10"]
queue = Queue.Queue()
for i in range(5):
t = DownloadThread(queue)
t.setDaemon(True)
t.start()
for url in urls:
queue.put(url)
queue.join()
网友评论