Python的编程中有四种队列模式,但是其应用的场景不同,在此介绍线程之间通信,利用的主要是Queue.queue
Queue主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。
首先我们组要明白为什么要使用队列,队列的性质,
多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题
为了解决线程之间数据共享问题, PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。
正常请求的多线程,如果是消费之和生产者,通过列表实现,多线程会对列表中的数据取值,会出现同时访问列表数据的情况,这时候就需要对线程进行加锁或者是线程等待,手动进行解决,过于麻烦,但是队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。
FIFO
Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
LIFO
Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
priority
class Queue.PriorityQueue(maxsize=0)
构造一个优先队列。maxsize用法同上。
基本方法:
Queue.Queue(maxsize=0)
#FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,
Queue.LifoQueue(maxsize=0)
#LIFO, 如果maxsize小于1就表示队列长度无限
Queue.qsize()
#返回队列的大小
Queue.empty()
#如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息
Queue.full()
# 如果队列满了,返回True,反之False,给生产者提醒
Queue.get([block[, timeout]])
读队列,timeout等待时间
Queue.put(item, [block[, timeout]])
写队列,timeout等待时间
Queue.queue.clear()
清空队列
task_done()
#意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join()
#阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
队列需求一(爬虫的请求地址)
Python多线程主要是为了提高程序在IO方面的优势,在爬虫的过程中显得尤为重要。正常的爬虫请求直接封装多线程就ok,但是爬虫请求的过程中,对于url的请求需要通过队列来实现,这是队列的需求之一。
对于爬虫的请求地址来说,一般是有规律性可循的,如果是翻页数据,可以将请求到的url放到队列中,通过多线程对队列进行取数据,如果队列为空,线程判断自动等待,循环加入队列url,线程自动请求,以下伪代码,作为参考:
import threading
from queue import Queue
class ThreadCrawl(threading.Thread):
def __init__(self, threadName, idQueue):
# 继承父类的方法
super(ThreadCrawl, self).__init__()
self.threadName = threadName # 线程名字
def run(self):
print('启动' + self.threadName)
while not self.idQueue.empty():
try:
id = self.idQueue.get(False) # False 如果队列为空,抛出异常
time.sleep(1)
print("~"*300)
self.get_con(id)
except Exception as e:
print('队列为空。。。。。', e)
pass
print('#'*300)
def get_con(self): #自己封装的请求自定义
pass
def get_id(m, n):
conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432')
cur = conn.cursor()
sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n)
cur.execute(sql1)
data = cur.fetchall()
conn.commit()
return data
def main():
n = 60
while True:
m = 20
# m是固定值,一次去20条, n是第几条开始
print('开始采集n的值为', n)
if n == 200000:
break
# id的队列
idQueue = Queue(20)
if idQueue.empty():
data = get_id(m, n)
for i in data:
idQueue.put(i[0])
# 采集线程的数量
crawlList = []
for id in range(1, 2):
name = '采集线程{}'.format(id)
crawlList.append(name)
# 存储采集线程的列表集合
threadcrawl = []
for threadName in crawlList:
thread = ThreadCrawl(threadName, idQueue)
thread.start()
threadcrawl.append(thread)
for thread in threadcrawl:
thread.join()
n = n + m
print("主线程退出..............")
if __name__ == '__main__':
main()
以上代码是作者从数据库中取数据,间隔性取,让后拼装到url,进行请求
队列需求二(生产者、消费者模型)
import threading
import time
from queue import Queue
def put_id():
i = 0
while True:
i = i + 1
print("添加数据", i, id_queue.qsize())
time.sleep(0.1)
id_queue.put(i)
def get_id(m):
while True:
i = id_queue.get()
print("线程", m, '取值', i)
if __name__ == "__main__":
id_queue = Queue(20)
Th1 = threading.Thread(target=put_id, )
Th2 = threading.Thread(target=get_id, args=(2, ))
Th3 = threading.Thread(target=get_id, args=(3, ))
Th5 = threading.Thread(target=get_id, args=(4, ))
Th4 = threading.Thread(target=get_id, args=(5, ))
Th2.start()
Th1.start()
Th3.start()
Th4.start()
Th5.start()
网友评论