美文网首页
消息队列之RabbitMQ《二》

消息队列之RabbitMQ《二》

作者: 倔犟的贝壳 | 来源:发表于2022-04-09 16:34 被阅读0次

上一篇,我们实现了一个简单的消息队列:
1、生产者发送消息到rabbitMQ Server,rabbitMQ Server将我们把消息队列保存,然后消费者从队列依次取出。
2、我们通过消息确认机制保证消息已被消费者处理。

但是异常也有可能发生在rabbitMQ Server端。如果server异常挂掉,队列里面还有消息未处理完,那我们再次重启server的时候,消息队列里的消息就全没了,显然,这是不能接受的。另外,上一篇,我们是直接指定将消息发送到某个队列,这样我们首先就要创建队列,并给队列命名。producer和consumer要统一队列名。如果每一个新的连接都维持一个新的队列,指定队列名就不太好使了。

简单的消息队列,一个消息只能被一个消费者获取并完成处理。如果一个消息要发送给多个消费者呢。接下来,我们看一种新的模式,即Publish/Subscribe 发布订阅模式。

pub/sub .png
  • 发布订阅者模式下,生产者将消息发送到exchange,exchange,根据路由routing_key发送到相应的队列(fanout类型除外,它会广播到可接收的所有队列)
  • 消费者可以自己声明一个队列,并将队列与exchange进行绑定,绑定时可指定routing_key。
  • 这样,通过exchange和routing_key我们就实现了自由地获取自己想要的信息。不再受queue的约束。一个消息可以发送到多个queue,消费者也可以自定义queue来获取消息
  • Exchange的类别分为4种:direct、topic、header、fanout

exchange_type:fanout
在调用exchange_declare声明exchange时,将exchange_type设置为fanout,我们就得到了一个fanout的exchange。它会将消息发到所有绑定到该exchange上的队列,无需routing_key.

生产者emit_log.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

#声明一个exchange,并指定类型为fanout
#fanout为广播类型,即会发到所有的队列
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ''.join(sys.argv[1:]) or 'info:Hello World'

#指定exchange,fanout无需指定routing_key
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

print('[X] Sent %r' % message)

connection.close()

receive_log.py

import pika

connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#声明一个exchange
channel.exchange_declare(exchange='logs',exchange_type='fanout')

#我们可以用queue=''声明一个临时的队列,名称由server生成或者server从已有队列中选一个给我们使用
#将exclusive设置为True,只允许当前连接可访问。因为是一个临时队列,在当前连接断开后,该队列就会被删除掉
result = channel.queue_declare(queue='',exclusive=True)
#获取队列的名称
queue_name = result.method.queue
#将交换机与队列相绑定
channel.queue_bind(exchange='logs',queue=queue_name)

#回调
def callback(ch,method,properties,body):
    print("[x] %r" % body)

print('[*] Waiting for logs.To exit press CTRL+C')
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True)
#启动消费线程
channel.start_consuming()

exchange_type:direct
上面的fanout的类型是一种广播形式。但是在一些情况下,我们希望exchange的消息是分类发到不同的队列中的,比如有一个名为device的exchange,我们希望灯light、开关switches等不同的设备的消息会发到不同的的队列中。此时我们就需要用到routing_key。

type=direct.png

把上面的代码稍微改一下:
发送部分emit_log_direct.py:

#声明一个名为device的exchange,类型是direct
channel.exchange_declare(exchange='device',
                         exchange_type='direct')

#获取命令行的第2个参数为routing_key
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ''.join(sys.argv[2:]) or "Hello World!"
#发送的时候指定exchange,routing_key.
channel.basic_publish(
    exchange='direct_logs',routing_key=severity,body=message
)

接收部分receive_logs_direct.py:

severities = sys.argv[1:]

if not severities:
    sys.stderr.write('Usage:%s[info] [warning] [error]\n' %sys.argv[0])
    sys.exit(1)
    
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs',queue=queue_name,routing_key=severity
    )

终端执行,我们启动接收的worker,只接收light的消息

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python receive_logs_direct.py light
[*] Waiting for logs.To exit press CTRL+C
[X] 'light':b'Hello World!'

分别设置routing_key为light和switch

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_direct.py light
[X] Sent 'light':'Hello World!'
(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_direct.py switch
[X] Sent 'switch':'Hello World!'

上面的结果可以看到,消费者之收到了light的消息,并没有收到switch的消息。switch的消息,exchange根据路由无法找到队列,就被丢弃了。
exchange_type:topic
direct的路由都是完全匹配的,有些情况下,一些消息可能有一个大的主题,然后只要是这个主题下,都要。比如有一个news的exchange,不管wechat.news,还是weibo.news,我这边都需要。
把direct的代码稍改一下:

channel.exchange_declare(exchange='news',exchange_type='topic')

然后运行:
python receive_logs_topic.py "*.news"
*匹配任意字符串

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_topic.py wechat.news wechat
[x] sent 'wechat.news':'wechat'
(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_topic.py weibo.news weibo  
[x] sent 'weibo.news':'weibo'
(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % 

receive:

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python receive_logs_topic.py "*.news"
[*] Waiting for logs.To exit press CTRL+C
[x] 'wechat.news':b'wechat'
[x] 'weibo.news':b'weibo'

这样,不论是weixin的news还是weibo的news,全都获取。

durable
上面我们只是介绍了另一种队列的使用方式,但是重启server,消息丢失的问题还未解决。
怎样保证rabbitMQ Server重启之后,消息不被丢失呢?很简单,只需两步设置:

  • 声明队列的时候,设置durable参数为True,表示在server重启中要“活下来”,不要被删除掉。
channel.queue_declare(queue='task_queue',durable=True)
  • 调用basic_publish发送消息时,设置属性delivery_mode为pika.spec.PERSISTENT_DELIVERY_MODE
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                            delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,#make message persistent)
)

按照上面两步设置,即使rabbitMQ Server因意外重启,未处理的消息也不会丢失,在Server重启后又可以继续处理。

相关文章

  • 消息队列之 RabbitMQ

    消息队列之 RabbitMQ 转载自:消息队列值RabbitMQ 关于消息队列,从前年开始断断续续看了些资料,想写...

  • 消息队列之RabbitMQ《二》

    上一篇,我们实现了一个简单的消息队列:1、生产者发送消息到rabbitMQ Server,rabbitMQ Ser...

  • Redis作为消息队列与RabbitMQ的比较

    Redis作为消息队列与RabbitMQ的比较 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议...

  • 消息队列——了解一下

    本文将从以下几点展开 为什么使用消息队列? 消息队列的缺点 消息队列如何选型 RabbitMQ RabbitMQ ...

  • 异步消息队列

    1. 消息队列 rabbitmq - 提供消息队列服务 rabbitmq 常用指令 docker run -d -...

  • SpringBoot使用RabbitMQ及RabbitMQ介绍

    SpringBoot使用RabbitMQ RabbitMQ 是 消息队列 实现AMQP(高级消息队列协议Advan...

  • 【rabbitMQ】消息队列之 rabbitMQ

    消息队列之 RabbitMQ(推荐) https://www.jianshu.com/p/79ca08116d57...

  • 消息队列之 RabbitMQ

    消息的介绍 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技...

  • 消息队列之 RabbitMQ

    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...

  • 消息队列之 RabbitMQ

    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...

网友评论

      本文标题:消息队列之RabbitMQ《二》

      本文链接:https://www.haomeiwen.com/subject/bfgzjrtx.html