美文网首页Python
Python多线程——队列(Queue)

Python多线程——队列(Queue)

作者: 周周周__ | 来源:发表于2019-05-25 23:08 被阅读277次

    Python的编程中有四种队列模式,但是其应用的场景不同,在此介绍线程之间通信,利用的主要是Queue.queue

    Queue主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。

    首先我们组要明白为什么要使用队列,队列的性质,

    多线程并发编程的重点,是线程之间共享数据的访问问题和线程之间的通信问题
    为了解决线程之间数据共享问题, PYTHON 提供了一个数据类型【队列】可以用于在多线程并发模式下,安全的访问数据而不会造成数据共享冲突。
    正常请求的多线程,如果是消费之和生产者,通过列表实现,多线程会对列表中的数据取值,会出现同时访问列表数据的情况,这时候就需要对线程进行加锁或者是线程等待,手动进行解决,过于麻烦,但是队列会通过先进先出或者先进后出的模式,保证了单个数据不会进行同时被多个线程进行访问。

    FIFO

    Queue.Queue(maxsize=0)
    FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

    LIFO

    Queue.LifoQueue(maxsize=0)
    LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上

    priority

    class Queue.PriorityQueue(maxsize=0)
    构造一个优先队列。maxsize用法同上。

    基本方法:

    Queue.Queue(maxsize=0) #FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,
    Queue.LifoQueue(maxsize=0) #LIFO, 如果maxsize小于1就表示队列长度无限
    Queue.qsize() #返回队列的大小
    Queue.empty() #如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息
    Queue.full() # 如果队列满了,返回True,反之False,给生产者提醒
    Queue.get([block[, timeout]]) 读队列,timeout等待时间
    Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间
    Queue.queue.clear() 清空队列
    task_done()#意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
    join()#阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

    队列需求一(爬虫的请求地址)

    Python多线程主要是为了提高程序在IO方面的优势,在爬虫的过程中显得尤为重要。正常的爬虫请求直接封装多线程就ok,但是爬虫请求的过程中,对于url的请求需要通过队列来实现,这是队列的需求之一。
    对于爬虫的请求地址来说,一般是有规律性可循的,如果是翻页数据,可以将请求到的url放到队列中,通过多线程对队列进行取数据,如果队列为空,线程判断自动等待,循环加入队列url,线程自动请求,以下伪代码,作为参考:

    import threading
    from queue import Queue
    
    class ThreadCrawl(threading.Thread):
        def __init__(self, threadName, idQueue):
            # 继承父类的方法
            super(ThreadCrawl, self).__init__()
            self.threadName = threadName          # 线程名字
        def run(self):
            print('启动' + self.threadName)
            while not self.idQueue.empty():
                try:
                    id = self.idQueue.get(False)  # False 如果队列为空,抛出异常
                    time.sleep(1)
                    print("~"*300)
                    self.get_con(id)
    
                except Exception as e:
                    print('队列为空。。。。。', e)
                    pass
                print('#'*300)
    
        def get_con(self):  #自己封装的请求自定义
            pass
    def get_id(m, n):
        conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432')
        cur = conn.cursor()
        sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n)
        cur.execute(sql1)
        data = cur.fetchall()
        conn.commit()
        return data
    def main():
        n = 60
        while True:
            m = 20
            # m是固定值,一次去20条, n是第几条开始
            print('开始采集n的值为', n)
            if n == 200000:
                break
    
            # id的队列
            idQueue = Queue(20)
            if idQueue.empty():
                data = get_id(m, n)
                for i in data:
                    idQueue.put(i[0])
    
            # 采集线程的数量
            crawlList = []
            for id in range(1, 2):
                name = '采集线程{}'.format(id)
                crawlList.append(name)
    
            # 存储采集线程的列表集合
            threadcrawl = []
            for threadName in crawlList:
                thread = ThreadCrawl(threadName, idQueue)
                thread.start()
                threadcrawl.append(thread)
    
            for thread in threadcrawl:
                thread.join()
            n = n + m
        print("主线程退出..............")
    if __name__ == '__main__':
        main()
    
    以上代码是作者从数据库中取数据,间隔性取,让后拼装到url,进行请求

    队列需求二(生产者、消费者模型)

    import threading
    import time
    from queue import Queue
    
    def put_id():
        i = 0
        while True:
            i = i + 1
            print("添加数据", i, id_queue.qsize())
            time.sleep(0.1)
            id_queue.put(i)
    
    def get_id(m):
        while True:
            i = id_queue.get()
            print("线程", m, '取值', i)
    
    
    if __name__ == "__main__":
    
    
        id_queue = Queue(20)
    
        Th1 = threading.Thread(target=put_id, )
    
        Th2 = threading.Thread(target=get_id, args=(2, ))
        Th3 = threading.Thread(target=get_id, args=(3, ))
        Th5 = threading.Thread(target=get_id, args=(4, ))
        Th4 = threading.Thread(target=get_id, args=(5, ))
    
        Th2.start()
        Th1.start()
    
        Th3.start()
        Th4.start()
        Th5.start()
    

    相关文章

      网友评论

        本文标题:Python多线程——队列(Queue)

        本文链接:https://www.haomeiwen.com/subject/rgrttctx.html