美文网首页Python全栈工程师
26.2-Queue使用和分发器实现

26.2-Queue使用和分发器实现

作者: BeautifulSoulpy | 来源:发表于2019-10-22 22:17 被阅读0次

    当你把希望,放在别人身上时,你会选择等待,当你把希望,放在自己身上时,你会选择奔跑!

    总结

    1.函数的栈跟线程相关;函数的每一次调用都是不同的;
    2.时间应该跟timedelta相加减;

    Queue 模块

    它可用于在生产者(producer)和消费者(consumer)之间线程安全(thread-safe)地传递消息或其它数据,因此多个线程可以共用同一个Queue实例。Queue的大小(元素的个数)可用来限制内存的使用。

    Queue 模块实现了三种类型的队列,它们的区别仅仅是队列中元素被取回的顺序。

    在 FIFO 队列(先入先出)中,先添加的任务先取回;
    在 LIFO 队列(后入先出)中,最近被添加的元素先取回(操作类似一个堆栈);
    在优先级队列PriorityQueue中,元素将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回;

    阻塞:卡住了,
    超时:None 不超时; 5 等5秒 ;

    Queue方法

    常用方法 -
    Queue.qsize() 返回队列的大小
    Queue.empty() 如果队列为空,返回True,反之False
    Queue.full() 如果队列满了,返回True,反之False
    Queue.full 与 maxsize 大小对应
    Queue.get([block[, timeout]]) 获取队列,timeout等待时间
    Queue.get_nowait() 相当Queue.get(False)
    Queue.put(item) 写入队列,timeout等待时间, item非可迭代对象
    Queue.put_nowait(item) 相当Queue.put(item, False)
    Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    Queue.join() 实际上意味着等到队列为空,再执行别的操作

    从队列中移除元素并返回这个元素block 为 阻塞,timeout为超时。
    如果block为True,是阻塞,timeout为None就是一直阻塞
    如果block为True但是timeout有值,就阻塞到一定秒数抛出Empty异常。

    如果block为False,是非阻塞,timeout将被忽略,要么成功返回一个元素,要么抛出empty异常。

    1 queue 模块定义的类和异常

    queue 模块定义了以下四种不同类型的队列,它们之间的区别在于数据入队列之后出队列的顺序不同。

    1.1 queue.Queue(maxsize=0)

    入参 maxsize 是一个整数,用于设置队列的最大长度。一旦队列达到上限,插入数据将会被阻塞,直到有数据出队列之后才可以继续插入。
    如果 maxsize 设置为小于或等于零,则队列的长度没有限制。
    block 阻塞

    qsize()
    获取队列的元素个数。
    
    put(item [,block[, timeout]]): 
    往queue中放一个item
    
    get(item [,block[, timeout]]): 
    从queue中取出一个item,并在队列中删除的这个item
    
    import queue
    q = queue.Queue()  # 创建 Queue 队列
    for i in range(3):
        q.put(i)  # 在队列中依次插入0、1、2元素
    for i in range(3):
        print(q.get())  # 依次从队列中取出插入的元素,数据元素输出顺序为0、1、2
    
    q.get(True,None)# 一直等,直到有数据为止;
    q.get(True,5)   # 等一会 Empty;
    q.get(False,None) # 要么拿,要么Empty;
    q.get(False,5)  #要么拿,要么直接Empty;
    

    如果 block 为 True , timeout 为 None(也是默认的选项),那么get()/put()可能会阻塞,直到队列中出现可用的数据/位置。如果 timeout 是正整数,那么函数会阻塞直到超时N秒,然后抛出一个异常。

    如果 block 为 False ,如果队列无数据,调用get()或者有无空余位置时调用put(),就立即抛出异常
    有数据则拿,无数据则抛异常;timeout 将会被忽略)。

    get 和 put _ 阻塞block 和 超时timeout
    import queue
    
    # 设置上限maxsize=10
    q = queue.Queue(maxsize=10)
    
    # 往队列加10个数据
    for i in range(100):
        if q.qsize() >= 10:
            # 存放的数据达到上限maxsize,插入会导致阻塞
            break
        else:
            q.put(i)
    
    # 从队列取值
    while not q.empty():
        n = q.get()
        print("本次取出数据:%s" % n)
    #----------------------------------------------------------------
    本次取出数据:0
    本次取出数据:1
    本次取出数据:2
    本次取出数据:3
    本次取出数据:4
    本次取出数据:5
    本次取出数据:6
    本次取出数据:7
    本次取出数据:8
    本次取出数据:9
    
    
    # 一直提交数据;
    from queue import Queue
    
    q = Queue(5)
    for i in range(5):
        q.put(i + 1)
    print('-' * 30)
    
    # 从对列中取值;
    while True:
        a = input('<<<<')
        print('~~~~~~~~~')
        print(q.full())
        if not q.full():
            q.put_nowait(a)
        else:
            while not q.empty():
                print(q.get())
        print('~~~~~~~~~~~~~~~')
    #--------------------------------------------
    ~~~~~~~~~
    True
    1
    2
    3
    4
    5
    ~~~~~~~~~~~~~~~
    ~~~~~~~~~
    

    2 Queue、LifoQueue、PriorityQueue 和 SimpleQueue 对象的基本使用方法

    Queue、LifoQueue、PriorityQueue 和 SimpleQueue 四种队列定义的对象均提供了以下函数使用方法,下面以 Queue 队列为例进行介绍。

    2.1 Queue.qsize()

    返回队列中数据元素的个数。注意并发情况;

    import queue
    q = queue.Queue()
    q.put('python-100')  # 在队列中插入元素 'python-100'
    print(q.qsize())  # 输出队列中元素个数为1
    
    2.2 Queue.put(item, block=True, timeout=None)

    item,放入队列中的数据元素。
    timeout,设置超时时间。

    block,当队列中元素个数达到上限继续往里放数据时:
    如果 block=False,直接引发 queue.Full 异常;
    如果 block=True,且 timeout=None,则一直等待直到有数据出队列后可以放入数据;
    如果 block=True,且 timeout=N,N 为某一正整数时,则等待 N 秒,如果队列中还没有位置放入数据就引发 queue.Full 异常。

    import queue
    try:
        q = queue.Queue(2)  # 设置队列上限为2
        q.put('python')  # 在队列中插入字符串 'python'
        q.put('-') # 在队列中插入字符串 '-'
        q.put('100', block = True, timeout = 5) # 队列已满,继续在队列中插入字符串 '100',等待5秒后会引发 queue.Full 异常
    except queue.Full:
        print('queue.Full')
    
    2.3 Queue.get(block=True, timeout=None)

    从队列中取出数据并返回该数据内容。

    block,当队列中没有数据元素继续取数据时:
    如果 block=False,直接引发 queue.Empty 异常;
    如果 block=True,且 timeout=None,则一直等待直到有数据入队列后可以取出数据;
    如果 block=True,且 timeout=N,N 为某一正整数时,则等待 N 秒,如果队列中还没有数据放入的话就引发 queue.Empty 异常。

    import queue
    try:
        q = queue.Queue()
        q.get(block = True, timeout = 5) # 队列为空,往队列中取数据时,等待5秒后会引发 queue.Empty 异常
    except queue.Empty:
        print('queue.Empty')
    
    
    2.4 Queue.get_nowait()

    相当于 Queue.get(block=False)block,当队列中没有数据元素继续取数据时直接引发 queue.Empty 异常。

    import queue
    try:
        q = queue.Queue()
        q.get_nowait() # 队列为空,往队列中取数据时直接引发 queue.Empty 异常
    except queue.Empty:
        print('queue.Empty')
    
    2.5 Queue.put_nowait(item)

    相当于 Queue.put(item, block=False),当队列中元素个数达到上限继续往里放数据时直接引发 queue.Full 异常。

    import queue
    try:
        q = queue.Queue(2)  # 设置队列上限为2
        q.put_nowait('python')  # 在队列中插入字符串 'python'
        q.put_nowait('-') # 在队列中插入字符串 '-'
        q.put_nowait('100') # 队列已满,继续在队列中插入字符串 '100',直接引发 queue.Full 异常
    except queue.Full:
        print('queue.Full')
    
    2.6 Queue.full()

    如果队列中元素个数达到上限,返回 True,否则返回 False。

    import queue
    q = queue.Queue(3)  # 定义一个长度为3的队列
    print(q.full())  # 元素个数未达到上限,返回 False
    q.put('python')  # 在队列中插入字符串 'python'
    q.put('-') # 在队列中插入字符串 '-'
    q.put('100') # 在队列中插入字符串 '100'
    print(q.full())  # 元素个数达到上限,返回 True
    
    2.7 Queue.empty()

    如果队列为空,返回 True,否则返回 False。

    import queue
    q = queue.Queue()
    print(q.empty())  # 对列为空,返回 True
    q.put('python-100')  # 在队列中插入元素 'python-100'
    print(q.empty())  # 对列不为空,返回 False
    

    命令分发器实现

    生产者(数据源)生产数据,缓冲到消息队列中
    数据处理流程: 数据加载 -》 提取 -》 分析(滑动窗口函数)

    处理大量数据的时候,对于一个数据源来说,需要多个消费者处理。但是如何分配数据就是个问题了。
    需要一个分发器(调度器),把数据分发给不同的消费者处理。
    每一个消费者拿到数据后,有自己的处理函数。所以要有一种注册机制
    数据加载 --》 提取 --》 分发 ---》 分析函数1|----》 分析函数2

    分析1和分析2是不同的handler,不同的窗口宽度、间隔时间

    如何分发?

    这里就简单一点,轮询策略。
    一对多的副本发送,一个数据通过分发器,发送到n个消费者。

    消息队列

    在生产者和消费者之间使用消息队列,那么所有消费者共用一个消息队列,还是各自拥有一个消息队列呢?
    共用一个消息队列也可以,但是需要解决争抢的问题。相对来说每一个消费者自己拥有一个队列,较为容易。

    如何注册?

    在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。

    线程
    由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程。

    线程

    线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位;
    守护线程是一个在后台运行,且不用费心去关闭它的线程,因为它会随程序自动关闭。

    如果程序运行的线程是非守护线程,那么程序将等待所有线程结束后再终止。
    如果运行的是守护线程,当程序退出时,守护线程会被自动杀死。

    # 线程
    import threading, time
    def add(x, y):
        print('enter this thread!')
        time.sleep(3)   # 等3秒的时候运行~~~~~~~~~,在再继续运行函数;
        print(x + y)
        print('===========end')
    
    # 创建线程:target是函数名,args是一个元组类型
    t = threading.Thread(target=add, args=(4, 5))   #Thread类很好地封装了有关线程的子类
    t.start()    #调用.start()方法:
    print('\n~~~~~~~~~~~~~~~~~~~~~')
    #----------------------------------------------------------
    enter this thread!
    ~~~~~~~~~~~~~~~~~~~~~   
    
    9
    ===========end
    
    # 线程
    import threading,time
    from queue import Queue
    q = Queue()
    def add(x,y):
        while True:
            print('enter this thread!')
            cmd = q.get() # 卡住了;
            print("~~~~~~~~~~~~~~~~")
        print(x+y)
        
    t = threading.Thread(target=add,args=(4,5))
    t.start()
    print('-----------------------')
    while True:
        cmd = input('<<<')
        if cmd == '':
            break
        else:
            q.put(cmd)
    #-----------------------------------
    enter this thread!-----------------------
    
    <<<1
    ~~~~~~~~~~~~~~~~
    enter this thread!
    <<<
    
    
    
    # 生成器;
    import random #产生随机数;
    import time  # 休息一会
    import datetime # 时间
    
    def source(seconds=1):
        while True:
            yield {'datetime':datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value':random.randint(1,100)}
            time.sleep(seconds)
                   
    s = source()
    # collecting date
    items = [next(s) for _ in range(3)]
    print(items)
    
    def avg_handler(iterable):
        return sum(map(lambda item:item['value'],iterable)) / len(iterable)
    
    ret = avg_handler(items)
    print('{:.2f}'.format(ret))
    
    import re,datetime,time,threading
    from queue import Queue
    
    def window(q:Queue,handler,width:int,interval:int):
        buf = []
        start =datetime.datetime.strptime('19700101 00:00:01 +0800','%Y%m%d %H:%M:%S %z')
        current =datetime.datetime.strptime('19700101 00:00:01 +0800','%Y%m%d %H:%M:%S %z')
        delta = datetime.timedelta(seconds=width - interval)  
        
        while True:
            data = q.get() # 阻塞的 next(iterator)
            if data:
                buf.append(data)
                current =data['datetime']
            print(current,start)
            
            if (current - start).total_seconds() > interval:
                print('~~~~~~~~~~~~~')
                ret = handler(buf)
                print('{:.2f}'.format(ret))
                print(threading.current_thread())
                start = current
                
                # clean old_date
                buf = [x for x in buf if x['datetime'] > current - delta ]
                
                
    def dispatcher(src):
        handlers = []
        queues = []
        
        def reg(handler,width,interval):    # 数据谁,handler、width、interval  ;
            q = Queue()
            t = threading.Thread(target=window,args=(q,handler,width,interval))
            
            queues.append(q)
            handlers.append(t)
            
        def run():
            for t in handlers:
                t.start()
                
            while True:
                data = next(src)
                for q in queues:
                    q.put(data)
                
        return reg,run
    
    reg,run = dispatcher(s)
    
    reg(avg_handler,10,5)
    # reg(avg_handler,10,5)            
    
    
    # window(s,avg_handler,10,5)           
    # run()
    print(threading.current_thread())
    run()
    #---------------------------------------------------------
    <_MainThread(MainThread, started 2488)>
    2019-10-22 22:04:31.513446+08:00 1970-01-01 00:00:01+08:00
    ~~~~~~~~~~~~~
    36.00
    <Thread(Thread-12, started 1960)>
    2019-10-22 22:04:32.513764+08:00 2019-10-22 22:04:31.513446+08:00
    2019-10-22 22:04:33.514088+08:00 2019-10-22 22:04:31.513446+08:00
    2019-10-22 22:04:34.514438+08:00 2019-10-22 22:04:31.513446+08:00
    2019-10-22 22:04:35.514738+08:00 2019-10-22 22:04:31.513446+08:00
    2019-10-22 22:04:36.515061+08:00 2019-10-22 22:04:31.513446+08:00
    ~~~~~~~~~~~~~
    54.83
    <Thread(Thread-12, started 1960)>
    2019-10-22 22:04:37.515387+08:00 2019-10-22 22:04:36.515061+08:00
    2019-10-22 22:04:38.515513+08:00 2019-10-22 22:04:36.515061+08:00
    2019-10-22 22:04:39.515842+08:00 2019-10-22 22:04:36.515061+08:00
    2019-10-22 22:04:40.516410+08:00 2019-10-22 22:04:36.515061+08:00
    2019-10-22 22:04:41.517063+08:00 2019-10-22 22:04:36.515061+08:00
    
    

    相关文章

      网友评论

        本文标题:26.2-Queue使用和分发器实现

        本文链接:https://www.haomeiwen.com/subject/etwqvctx.html