大家好,我是剑南。
本篇文章,为大家带来的是queue模块的详解!
初识queue模块
queue模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程交换的线程编程。模块中的Queue类实现了所需要的锁定语义。
该模块实现了三种类型的队列,它们的区别是任务取回的顺序。在FIFO队列中,先添加任务的先取回。在LIFO队列中,最后添加的任务先取回(该操作类似于堆栈)。在优先级队列中,条目将保持排序(使用heapq模块)并且最小值的任务第一个返回。
创建“队列”对象
import queue
q = queue.Queue(maxsize=5)
maxsize是一个整数,用于设置可以放入队列中的任务数的上限,当达到这个大小的时候,插入操作将阻塞至队列中的任务被消除掉。如果maxsize小于等于0,任务数量为无限大。
队列添加数据
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
print(q.full())
运行结果为:True。
Queue.full():表示当队列任务已满时,返回的结果为True。如果full()返回True不保证后续调用get()不被阻塞,同样的道理,如果full()返回False也不保证后续调用put()不被阻塞。
Queue.put(item, block=True, timeout=None):将Item放入队列,如果可选参数block是True并且timeout是None,则在必要时阻塞至有空闲插槽可用,如果timeout是正数,将最多阻塞timeout秒,如果这段时间没有可用的空闲插槽,则引发full异常。反之block为False,如果插槽空闲,则立即使用,把item放入队列,否则引发Full异常。
判断队列是否为空
Queue.empty():如果队列为空,则返回True,否则返回False。如果empty()返回True,不保证后续调用put()会被阻塞。类似的,如果empty()返回False,也不保证后续调用get()会被阻塞。
获取队列的大小
Queue.qsize():返回队列的大小。注意qsize>0不保证后续的get()有可能被阻塞,qsize<maxsize也不保证put()有可能被阻塞。
获取队列中数据
Queue.get(block=True, timeout=None):从对列中移除并返回一个数据。当队列为空值,将一直等待。
其他的Queue对象
Queue.task_done():表示前面的排队任务已经完成,被队列的消费者线程使用。每个get()被用于获取一个任务,后续调用task_done()告诉队列,该任务的处理已经完成。如果join()当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个put()进队列的条目task_done()都被收到)。
Queue.join():阻塞至对列的所有数据都被接收和处理完毕。当数据被添加到队列时,未完成的任务的计数就会增加。每当消费者线程调用task_done()表示这个条目已经被收回,未完成的计数就会减少,当完成计数降到0的时候,阻塞就会解除。
简单示例
下面的例子要展示的是,我们应该如何使用代码将等待的任务完成。
具体代码,如下所示:
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()
# send thirty task requests to the worker
for item in range(30):
q.put(item)
print('All task requests sent\n', end='')
# block until all tasks are done
q.join()
print('All work completed')
实战-豆瓣短评
对于本次实战,我采用的网站是豆瓣电影。小伙伴们可以自己去找一部电影,爬取里面的短评。
这次,我爬取的电影是《我不是药神》的短评,采用的便是队列的技术。
爬取思路
image在上图中,框出来的数据,就是我要获取的数据,并下得到的数据保存到csv文件中。
之所以选择上面的图片,其实是有原因的,不知道你发现没有,在上面的图片中第一条评论是没有给评价的,因此,当我们按照相同的规则去获取数据时,便容易出现异常。
其次,短评的数据量一共有52万条,每页20条,并且只能获取到前25页的数据。再加上,如果没有给出评价的用户,我直接过滤,因此,最后获取下来的数据应该是不足500条的。
再这里,要做的事情就是要完成翻页的操作。
在本次编码中,我的思路是采用两个线程与两个队列来完成。一个线程用于获取数据,一个线程用于保存数据;其中的一个队列用于保存25页的URL地址,另一个队列用于保存获取的数据。
获取数据
主线程
首先在主线程中,创建两个队列,并将URL添加进保存URL的队列中。
具体代码,如下所示:
def main():
p_queue = Queue() # 保存URL
d_queue = Queue() # 保存数据
for page in range(25):
url = f'https://movie.douban.com/subject/26752088/comments?start={page*20}&limit=20&status=P&sort=new_score'
p_queue.put(url)
获取数据
先说一下前提,这里我才用的解析库是lxml,因此,小伙伴们需要自行熟悉xpath语法。
这里创建一个获取数据的类,这个类继承thread,方便接下来开启线程。
具体代码,如下所示:
class GetData(threading.Thread):
def __init__(self, page_queue, data_queue):
super(GetData, self).__init__()
self.page_queue = page_queue
self.data_queue = data_queue
self.headers = {
'User-Agent': 你的user-agent,
'Cookie': '你的cookie'
}
def run(self):
while True:
if self.data_queue.empty() and self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self, url):
html = etree.HTML(requests.get(url, headers=self.headers).content.decode('utf-8'))
comment_items = html.xpath('//div[@class="comment-item "]')
for comment_item in comment_items:
try:
user = comment_item.xpath('.//span[2]/a/text()')[0]
comment_time = comment_item.xpath('.//span[2]//span[3]/@title')[0]
star = comment_item.xpath('.//span[2]//span[2]/@title')[0]
content = comment_item.xpath('.//span[@class="short"]/text()')[0]
self.data_queue.put((user, comment_time, star, content))
except:
continue
保存数据
同样的,这里创建一个保存数据的类,这个类也是继承thread,也是方便开启线程。
具体代码,如下所示:
class SaveData(threading.Thread):
def __init__(self, page_queue, data_queue):
super(SaveData, self).__init__()
self.data_queue = data_queue
self.page_queue =page_queue
def run(self):
with open('data.csv', 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['user', 'comment_time', 'star', 'content'])
while True:
if self.data_queue.empty() and self.page_queue.empty():
break
user, comment_time, star, content = self.data_queue.get()
print(self.data_queue.get())
with open('data.csv', 'a', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([user, comment_time, star, content])
开启多线程
多线程实在主线程中开启,具体代码,如下所示:
def main():
p_queue = Queue()
d_queue = Queue()
for page in range(25):
url = f'https://movie.douban.com/subject/26752088/comments?start={page*20}&limit=20&status=P&sort=new_score'
p_queue.put(url)
for x in range(5):
t1 = GetData(p_queue, d_queue)
# t1.daemon = True
t1.start()
t2 = SaveData(p_queue, d_queue)
# t2.daemon = True
t2.start()
数据展示
image不到4秒钟,便将短评数据都抓取下来了,多线程的效率是不是要比单线程要高很多呀!
最后
在本次的分享中,大家要熟悉与了解queue的使用方法,在后期分享中经常要用到,希望小伙伴们能够掌握。
我是剑南,如果文章给到了你帮助,请你点个【赞】与【再看】。
文章的每一个字都是我用心敲出来的,点个【再看】,让我知道,你也是陪着我一起努力的人。
网友评论