美文网首页
RabbitMQ实现任务队列

RabbitMQ实现任务队列

作者: 转身丶即天涯 | 来源:发表于2019-08-08 01:06 被阅读0次

    回顾

    上一章学习了如何使用pika第三方库连接RabbitMQ,并通过RabbitMQ传递消息,实现了简易的生产者消费者模式。
    上一章传送门:RabbitMQ入门

    文章是我学习记录用的,可以看rabbitmq中文文档
    ,他们解释的更专业。

    任务队列

    首先我们知道了什么是队列,那么什么是任务队列呢?
    任务队列,也有叫工作队列的。一般情况下,我们会将耗时或者大量占用资源的操作封装成任务,然后把任务发送给任务队列(其实这里的任务队列就是RabbitMQ),然后再由工作进程(或者叫做worker)将任务取出并执行,执行结束后触发回调函数,来告知RabbitMQ任务执行完毕。

    PS:上面说的概念有点乱,不过没关系,知道是什么就可以。

    准备工作

    现在假设有这样一个需求,我们用一个字符串来表示任务,后面每个‘.’来表示耗时1秒,比如'task1...',代表任务名为task1,耗时3秒。

    准备两个脚本,send.py和receive.py,分别表示生产者和消费者。
    send.py

    import sys
    import pika
    
    
    task = sys.argv[1] or "Hello World."
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    
    channel = connection.channel()
    channel.queue_declare(queue='task_queue')
    channel.basic_publish(exchange='', routing_key='task_queue', body=task)
    
    print("sent to [task_queue]: {}".format(task))
    

    这里的task,我是用命令行参数的形式传递进去的。稍后会在使用时说明。

    receive.py

    import time
    import pika
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare('task_queue')
    
    def callback(ch, method, properties, body):
        print("Recevied: {}".format(body))
        time.sleep(body.count('.'))
        print("{} done.".format(body))
    
    channel.basic_consume(on_message_callback=callback, queue='task_queue')
    
    print("waiting for task of task-queue. To exit press CTRL+C.")
    channel.start_consuming()
    

    模拟工作队列

    写好了脚本之后,我们可以开始探索工作队列了。
    现在,打开终端,并开启三个窗口。


    image.png

    PS:这里顺便说个Mac上的小技巧,按住cmd+空格会调出Sportlight搜索框,输入ter弹出的第一项就是终端,直接回车就能打开终端了。
    然后按cmd+t快捷键来新开一个命令窗口,cmd+w来关闭当前命令窗口。

    启动RabbitMQ

    在第一个命令行窗口,你需要先启动RabbitMQ。执行rabbitmq-server命令即可。

    在第二个命令行窗口,执行python send.py task1...


    image.png

    在第三个命令行窗口,执行python receive.py


    image.png

    哎,为什么会报错呢,看到最后一行:time.sleep(body.count('.'))报错了。

    经过断点调试才发现,body是bytes类型,而count方法是str对象的。所以,恍然大悟,得做解码处理。

    time.sleep(body.decode('utf-8').count('.'))
    

    然后,就可以正常运行了,结果如下图:


    image.png

    这就说明,消费者已经成功处理了‘task1...’


    一些常见问题的解决思路

    相关文章

      网友评论

          本文标题:RabbitMQ实现任务队列

          本文链接:https://www.haomeiwen.com/subject/bhczdctx.html