美文网首页
64.1-RabbitMQ安装、管理和名词解释

64.1-RabbitMQ安装、管理和名词解释

作者: BeautifulSoulpy | 来源:发表于2020-08-30 12:05 被阅读0次
不要把幸福的标准定得太高,生命中的任何一件小事只要你细心品味过,可以说都与幸福有关。因为无论怎样,幸福都只是一种感觉而已!

总结:

  1. 不要在缺省主机上加东西,
  2. pycharm同时运行一个py多次:选择:Edit Configurations勾线多线程跑程序;
  3. Python程序要解释器来解释和加载,解释后将源代码现场给你编译,编译之后读取字节码最后给你转换;你要起进程,最后给你起一个解释器进程;
    C/C++ 编译完成之后是个可执行文件/exe, 一旦被操作系统加载之后也就是个进程;
  4. 虚拟主机 test ; 交换机logs(记录日志) ;
  5. 本章关键: 虚拟主机 test ; 交换机logs(记录日志) ; 路由routing_key(orange\black\green); queue是name1\name2 ;
  6. 几种工作模式: routing_key: 全匹配 ; Topic: 模式匹配;
  7. 微服务 一般都是效率最高的;高并发的解决方案 本质都是一样的;
    参考:
    Python实现RabbitMQ中6种消息模型

RabbitMQ
RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健
壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。很成熟,久经考验,应用广泛。
文档详细,客户端丰富,几乎常用语言都有RabbitMQ的开发库;

MQ全称为Message Queue, 消息队列(MQ)是应用程序“对”应用程序的通信方法。
MQ:生产者往消息队列中投放消息,消费者可以读取队列中的消息。

RabbitMQ 是一个消息代理。这主要的原理十分简单,就是通过接受和转发消息。你可以把它想象成邮局:当你将一个包裹送到邮局,你会相信邮递员先生最终会将邮件送到接件人手上。RabbitMQ就好比一个邮箱,邮局或邮递员。

邮局和RabbitMQ两种主要的不同之处在于,RabbitMQ不处理文件,而是接受,并存储和以二进制形式将消息转发。

1. RabbitMQ安装

http://www.rabbitmq.com/install-rpm.html
选择RPM包下载,选择对应平台,本次安装在CentOS 6.5,其它平台类似。

文件传输 https://www.rabbitmq.com/install-rpm.html#downloads

由于使用了erlang语言开发,所以需要erlang的包,该下载页给出了链接

yum install socat-1.7.2.3-1.el6.x86_64.rpm erlang-20.1.7-1.el6.x86_64.rpm rabbitmq-server-3.7.0-1.el6.noarch.rpm

安装成功,查看安装的文件;

[dell@Centos7 ~]$ rpm -ql rabbitmq-server
/etc/logrotate.d/rabbitmq-server
/etc/profile.d/rabbitmqctl-autocomplete.sh
/etc/rabbitmq
/etc/rc.d/init.d/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha
/usr/lib/rabbitmq/autocomplete/bash_autocomplete.sh
/usr/lib/rabbitmq/autocomplete/zsh_autocomplete.sh
/usr/lib/rabbitmq/bin/cuttlefish
/usr/lib/rabbitmq/bin/rabbitmq-defaults
/usr/lib/rabbitmq/bin/rabbitmq-diagnostics
/usr/lib/rabbitmq/bin/rabbitmq-env
/usr/lib/rabbitmq/bin/rabbitmq-plugins
/usr/lib/rabbitmq/bin/rabbitmq-server
/usr/lib/rabbitmq/bin/rabbitmqctl
................

2. 配置

http://www.rabbitmq.com/configure.html#config-location

2.1 环境变量

使用系统环境变量,如果没有使用rabbitmq-env.conf 中定义环境变量,否则使用缺省值
RABBITMQ_NODE_IP_ADDRESS the empty string, meaning that it should bind to all network interfaces.
RABBITMQ_NODE_PORT 5672
RABBITMQ_DIST_PORT RABBITMQ_NODE_PORT + 20000 内部节点和客户端工具通信用
RABBITMQ_CONFIG_FILE 配置文件路径默认为/etc/rabbitmq/rabbitmq
环境变量文件,可以不配置

2.2 工作特性配置文件

rabbitmq.config配置文件 (这个文件也可以不配置)
3.7支持新旧两种配置文件格式
1、erlang配置文件格式,为了兼容继续采用

2、sysctl格式,如果不需要兼容,RabbitMQ鼓励使用

3. 插件管理

# 列出所有可用插件
[root@Centos7 network-scripts]# rabbitmq-plugins list

# 启动WEB管理插件
[root@Centos7 network-scripts]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@Centos7...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

enabled 3 plugins.
Offline change; changes will take effect at broker restart.

 systemctl enable rabbitmq-server  # 设置开机启动;

# 启动服务
[root@Centos7 rabbitmq]# systemctl start rabbitmq-server
[root@Centos7 rabbitmq]# systemctl status rabbitmq-server         # 启动成功
● rabbitmq-server.service - LSB: Enable AMQP service provided by RabbitMQ broker
   Loaded: loaded (/etc/rc.d/init.d/rabbitmq-server; bad; vendor preset: disabled)
   Active: active (running) since Tue 2020-08-11 23:40:11 CST; 11s ago
     Docs: man:systemd-sysv-generator(8)
  Process: 112551 ExecStart=/etc/rc.d/init.d/rabbitmq-server start (code=exited, status=0/SUCCESS)
    Tasks: 93
   CGroup: /system.slice/rabbitmq-server.service
           ├─110399 /bin/sh /etc/rc.d/init.d/rabbitmq-server start
           ├─110404 /bin/bash -c ulimit -S -c 0 >/dev/null 2>&1 ; /usr/sbin/rabbitmq-server
           ├─110406 /sbin/runuser -u rabbitmq -- /usr/lib/rabbitmq/bin/rabbitmq-server
           ├─110426 /bin/sh /usr/lib/rabbitmq/bin/rabbitmq-server
           ├─110653 /usr/lib64/erlang/erts-9.1.5/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -B i -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.0/ebin -noshell -noinput -s rabbit boot -sname rabbit@Centos7 -boot...

           ├─110766 erl_child_setup 1024
           ├─110809 inet_gethost 4
           └─110821 inet_gethost 4

Aug 11 23:39:46 Centos7 systemd[1]: Starting LSB: Enable AMQP service provided by RabbitMQ broker...
Aug 11 23:39:46 Centos7 runuser[112636]: pam_unix(runuser:session): session opened for user rabbitmq by (uid=0)
Aug 11 23:40:11 Centos7 rabbitmq-server[112551]: Starting rabbitmq-server: RabbitMQ is currently running
Aug 11 23:40:11 Centos7 rabbitmq-server[112551]: rabbitmq-server.
Aug 11 23:40:11 Centos7 systemd[1]: Started LSB: Enable AMQP service provided by RabbitMQ broker.


[root@Centos7 dell]# ss -tanl | grep 5672
LISTEN     0      128          *:15672                    *:*
LISTEN     0      128          *:25672                    *:*
LISTEN     0      128       [::]:5672                  [::]:*

报错

查看rabbitmq启动日志
cat /var/log/rabbitmq/startup_log
启动中,可能出现下面的错误 Error when reading /var/lib/rabbitmq/.erlang.cookie: eacces

就是这个文件的权限问题,修改属主、属组即可


开始登录WEB界面 [管理界面]http://192.168.0.100:15672/
来宾 用户: guest/guest
使用guest/guest只能本地地登录,远程登录会报错

4. 用户管理

[root@Centos7 dell]# rabbitmqctl
Usage:
rabbitmqctl [-n <node>] [-t <timeout>] [-l] [-q] <command> [<command options>]

General options:
    -n node
    -q quiet
    -t timeout
    -l longnames
Commands:
    add_user <username> <password>
    add_vhost <vhost>
    authenticate_user <username> <password>
    cancel_sync_queue [-p <vhost>] queue
    change_cluster_node_type <disc|ram>
    change_password <username> <password>
    clear_global_parameter <key>
    clear_operator_policy [-p <vhost>] <key>
    clear_parameter [-p <vhost>] <component_name> <key>
    clear_password <username>
    clear_permissions [-p vhost] <username>
    clear_policy [-p <vhost>] <key>
    clear_topic_permissions [-p vhost] <username> [<exchange>]
    clear_vhost_limits [-p <vhost>]
    close_all_connections [-p <vhost> --limit <limit>] [-n <node> --global] [--per-connection-delay <delay>] <explanation>
    close_connection <connectionpid> <explanation>
    set_user_tags



添加用户:
rabbitmqctl add_user username password
删除用户:
rabbitmqctl delete_user username
更改密码:
rabbitmqctl change_password username newpassword
设置权限Tags,其实就是分配组
rabbitmqctl set_user_tags username tag


[root@Centos7 dell]# rabbitmqctl add_user wayne wayne         
Adding user "wayne" ...
[root@Centos7 dell]# rabbitmqctl set_user_tags wayne administrator
Setting tags for user "wayne" to [administrator] ...

tag的意义如下
administrator 可以管理用户、权限、虚拟主机。

5. 基本信息

6. 虚拟主机

7. Python库

Pika是纯Python实现的支持AM QP协议的库

pip install pika

8. RabbitMQ工作原理及应用

名词解释
名词 说明
Server 服务器。接收客户端连接,实现消息队列及路由功能的进程(实现AMQP实体服务),也称为 消息代理。注意,客户端包括生产者和消费者
Connection 网络物理连接
Channel 一个连接允许多个客户端连接
Exchange 交换器。接收生产者发来的消息,决定如何路由给服务器中的队列。常用的类型有:direct(point-to-point)、topic (publish-subscribe)、fanout (multicast)
Message 消息
Message Queue 消息队列。数据的存储载体
Bind 绑定。建立消息队列和交换器之间的关系,也就是说交换器拿到数据,把什么样的数据送给哪个队列
Virtual Host 虚拟主机。一批交换机、消息队列和相关对象的集合。为了多用户互不干扰,使用虚拟主机分组交换机、消息队列
Topic 主题、话题
Broker 可等价为Server
1. 队列*

这种模式就是最简单的生产者消费者模型, 消息队列就是一个FIFO的队列

1.消息产生者将消息放入队列
2.消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

simple简单模式

生产者send.py,消费者receive.py
官方例子 https://www.rabbitmq.com/tutorials/tutorial-one-python.html
参照官方例子,写一个程序

# send.py 生产者代码

import pika

# 配置连接参数
params = pika.ConnectionParameters('192.168.0.100')
# 建立连接
connection = pika.BlockingConnection(params)

with connection:
    # 建立通道
    channel = connection.channel()
    # 创建一个队列,queue命名为test。如果queue不存在,消息将被dropped
    channel.queue_declare('test')

    channel.basic_publish(
        exchange='',    # 使用缺省exchange
        routing_key='test',   # routing_key必须指定,这里要求和目标queue一致
        body='test 1'     # 消息
    )
print('send meessage ok')
#----------------------------------------------------------
pika.exceptions.ProbableAuthenticationError: ConnectionClosedByBroker: (403) 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.'
访问被拒绝,还是权限问题,原因还是guest用户只能访问localhost上的 / 缺省虚拟主机

queue_declare声明一个queue,有必要的话,创建它。
basic_publish exchange为空就使用缺省exchange,如果找不到指定的exchange,抛异常
使用缺省exchange,就必须指定routing_key,使用它找到queue

解决办法

缺省虚拟主机,默认只能在本机访问,不要修改为远程访问,是安全的考虑。
因此,在Admin中Virtual hosts中,新建一个虚拟主机test
注意,新建的test虚拟主机的Users是谁,本次是wayne用户


在ConnectionParameters中并没有用户名、密码填写的参数,它使用参数credentials传入,这需要构建一个pika.credentials.Credentials对象。

修改代码如下:

# send.py 生产者代码
import pika

# 配置连接参数
cred = pika.PlainCredentials('wayne','wayne')
params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
# 建立连接
connection = pika.BlockingConnection(params)

with connection:
    # 建立通道
    channel = connection.channel()
    # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
    channel.queue_declare('hello')

    channel.basic_publish(
        exchange='',    # 使用缺省exchange
        routing_key='hello',   # routing_key必须指定,这里要求和目标queue一致
        body='test 1'     # 消息
    )
print('send meessage ok')

send 参数写法2: URLParameters , 也可以使用URL创建参数: %2F指代/ (/test),就是缺省

# send.py 生产者代码
import pika

# 配置连接参数
# cred = pika.PlainCredentials('wayne','wayne')
# params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')

# 建立连接
connection = pika.BlockingConnection(params)

with connection:
    # 建立通道
    channel = connection.channel()
    # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
    channel.queue_declare('hello')

    channel.basic_publish(
        exchange='',    # 使用缺省exchange
        routing_key='hello',  # routing_key必须指定,这里要求和目标queue一致
        body='test 1'     # 消息
    )
print('send meessage ok')

send.py 生产者多个消息 代码

import pika

# 配置连接参数
# cred = pika.PlainCredentials('wayne','wayne')
# params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')

# 建立连接
connection = pika.BlockingConnection(params)

with connection:
    # 建立通道
    channel = connection.channel()
    # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
    channel.queue_declare('hello')
    for i in range(40):
        channel.basic_publish(
            exchange='',    # 使用缺省exchange
            routing_key='hello',  # routing_key必须指定,这里要求和目标queue一致
            body='test {}'.format(i)     # 消息
        )
print('send meessage ok')

receive.py 消费者代码
单个消费消息

# receive.py 消费者代码
import pika

# 配置连接参数
# cred = pika.PlainCredentials('wayne','wayne')
# params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')

# 建立连接
connection = pika.BlockingConnection(params)

with connection:
    # 建立通道
    channel = connection.channel()
    # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
    data = channel.basic_get('hello',True)
    print(data)

print('receive meessage ok')
#-----------------------------------------------------
(<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=1', 'redelivered=False', 'routing_key=hello'])>, <BasicProperties>, b'test 1')
返回元组:(方法method, 属性properties, 消息body)

批量消费消息代码

# receive.py 消费者代码
import pika

params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')

connection = pika.BlockingConnection(params)
channel = connection.channel()
# channel.queue_declare('hello')  # 创建一个hello;

def callback(channel,method,properties,body):
    print(body)

with connection:
    tag = channel.basic_consume('hello',callback,True)  # 可以消费数据
    channel.start_consuming()   # forever 不停止

print('receive meessage ok')

2、工作队列

队列中的数据只能给一个消费者;拿走就没了;


继续使用队列模式的生产者消费者代码,启动2个消费者。
观察结果,可以看到,2个消费者是交替拿到不同的消息。
这种工作模式是一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它。
从结果知道,使用的是轮询方式拿走数据的

注意:虽然上面的图中没有画出exchange,用到缺省exchange。

3、发布、订阅模式 exchange

Publish/Subscribe发布和订阅, 想象一下订阅报纸, 所有订阅者(消费者) 订阅这个报纸(消息) , 都应该拿到
一份同样内容的报纸。
订阅者和消费者之间还有一个exchange, 可以想象成邮局, 消费者去邮局订阅报纸, 报社发报纸到邮局, 邮局决
定如何投递到消费者手中。

上例中工作队列模式的使用,相当于,每个人只能拿到不同的报纸。所以,不适用发布订阅模式。


当前模式的exchange的type是fanout, 就是一对多, 即广播模式
注意, 同一个queue的消息只能被消费一次, 所以, 这里使用了多个queue, 相当于为了保证不同的消费者拿到同
样的数据, 每一个消费者都应该有自己的queue。
# 生成一个交换机
channel.exchange_declare(
    exchange='logs', # 新交换机 logs  记录日志的
    exchange_type='fanout' # 广播,扇出;
)

生产者使用广播模式。在test虚拟主机主机下构建了一个logs交换机
至于queue,可以由生产者创建,也可以由消费者创建。
本次采用使用消费者端创建,生产者把数据发往交换机logs,采用了fanout,然后将数据通过交换机发往已经绑定
到此交换机的所有queue。

# 消费者端
result = channel.queue_declare() # 生成一个随机名称的queue
result = channel.queue_declare(exclusive=True) # 生成一个随机名称的queue,并在断开连接时删除queue
# 生成queue
q1 = channel.queue_declare(exclusive=True)
q2 = channel.queue_declare(exclusive=True)
q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称
q2name = q2.method.queue
print(q1name, q2name)
# 绑定
channel.queue_bind(exchange='logs', queue=q1name)
channel.queue_bind(exchange='logs', queue=q2name)

生产者代码——注意观察 交换机和队列

# send.py 生产者代码
import pika

en = 'log'
# 配置连接参数
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)

channel = connection.channel()
channel.exchange_declare(
    en,
    'fanout'
)

with connection:
    for i in range(50):
        channel.basic_publish(
            exchange=en,
            routing_key='',  
            body='test {:02}'.format(i)     # 消息
        )
print('send meessage ok')

多了一个虚拟主机 logs(有流量通信), hello也不变,根本不存在queue(容器);

多了一个虚拟主机 logs
特别注意:如果先开启生产者,由于没有队列queue,请观察数据会怎样呢?
no binding

消费者代码
构建queue并绑定到test虚拟主机的logs交换机上

# receive.py 消费者代码
import pika

en = 'log'
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)

channel = connection.channel()  # 建立rabbit协议的通道
channel.exchange_declare(
    en,
    'fanout'
)
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
# direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
# topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)


# 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
q1 = channel.queue_declare('',exclusive=True)
q2 = channel.queue_declare('',exclusive=True)
name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
name2 = q2.method.queue

# binding 告诉exchange将message发送该哪些queue
channel.queue_bind(exchange=en,queue=name1)
channel.queue_bind(exchange=en,queue=name2)

def callback(channel,method,properties,body):
    print(body)

with connection:
    tag1 = channel.basic_consume(name1,callback,True)  # 可以消费数据
    tag2 = channel.basic_consume(name2,callback,True)  # 可以消费数据
    channel.start_consuming()   # forever 不停止

print('receive meessage ok')

binding效果

尝试先启动生产者,再启动消费者试试看。
部分数据丢失, 因为, exchange收到了数据, 没有queue接收, 所以, exchange丢弃了这些数据;
所以要先运行 消费者,后运行 生产者 就有数据了;

4、路由Routing 模式***

路由其实就是生成者的数据经过exchange的时候,通过匹配规则,决定数据的去向

生产者代码 : 交换机类型为direct,指定路由的key

# send.py 生产者代码
# send.py 生产者代码
import pika,random

en = 'color'
colors = ('orange','black','green')

# 配置连接参数
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)


channel = connection.channel()
channel.exchange_declare(
    exchange=en,  # 交换机
    exchange_type='direct'  # 路由
)

with connection:
    for i in range(10):
        rk = '{}'.format(colors[random.randint(0,2)])
        channel.basic_publish(
            exchange=en,
            routing_key=rk,  # routing_key必须指定,这里要求和目标queue一致
            body='test {:02}'.format(i)     # 消息
        )
print('send meessage ok')


消费者代码

# receive.py 消费者代码
# receive.py 消费者代码
import pika

en = 'color'
colors = ('orange','black','green')
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)

channel = connection.channel()  # 建立rabbit协议的通道
channel.exchange_declare(
    en,
    'direct'
)
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
# direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
# topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)


# 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
q1 = channel.queue_declare('',exclusive=True)
q2 = channel.queue_declare('',exclusive=True)
name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
name2 = q2.method.queue

# binding 告诉exchange将message发送该哪些queue
channel.queue_bind(exchange=en,queue=name1,routing_key=colors[0])
channel.queue_bind(exchange=en,queue=name2,routing_key=colors[1])
channel.queue_bind(exchange=en,queue=name2,routing_key=colors[2])

def callback(channel,method,properties,body):
    print(body)

with connection:
    tag1 = channel.basic_consume(name1,callback,True)  # 可以消费数据
    tag2 = channel.basic_consume(name2,callback,True)  # 可以消费数据
    channel.start_consuming()   # forever 不停止

print('receive meessage ok')

color交换机
思考 : 如果routing_key设置的都一样,会怎么样?

绑定的时候指定的routing_key='black',如上图,和fanout就类似了,都是1对多,但是不同。
因为fanout时,exchange不做数据过滤的,1个消息,所有绑定的queue都会拿到一个副本。
direct时候,要按照routing_key分配数据,上图的black有2个queue设置了,就会把1个消息分发给这2个queue(1变2)。

5. Topic 话题

Topic就是更加高级的路由,支持模式匹配而已
Topic的routing_key必须使用 . 点号分割的单词组成。最多255个字节。
支持使用通配符:

* 表示严格的一个单词
# 表示0个或者多个单词

如果queue绑定的routing_key只是一个#, 这个queue其实可以接收所有的消息。

如果没有使用任何通配符, 效果类似于direct

生产者代码

# send.py 生产者代码
import pika,random

en = 'products'
products = ('phone','tv','bike')
colors = ('orange','black','green')

topic = ('phone.*','*.black')

# 配置连接参数
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)


channel = connection.channel()
channel.exchange_declare(
    exchange=en,  # 交换机
    exchange_type='topic'  # 路由
)

with connection:
    for i in range(10):
        rk = '{}.{}'.format(products[random.randint(0,2)],colors[random.randint(0,2)])
        channel.basic_publish(
            exchange=en,
            routing_key=rk,  # routing_key必须指定,这里要求和目标queue一致
            body='{} {:02}'.format(rk,i)     # 消息
        )
print('send meessage ok')

消费者代码

# receive.py 消费者代码
import pika

en = 'products'
colors = ('orange','black','green')
products = ('phone','tv','bike')
topic = ('phone.*','*.black')


params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)

channel = connection.channel()  # 建立rabbit协议的通道
channel.exchange_declare(
    en,
    'topic'
)
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
# direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
# topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)


# 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
q1 = channel.queue_declare('',exclusive=True)
q2 = channel.queue_declare('',exclusive=True)
name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
name2 = q2.method.queue

# binding 告诉exchange将message发送该哪些queue
channel.queue_bind(exchange=en,queue=name1,routing_key=topic[0])
channel.queue_bind(exchange=en,queue=name2,routing_key=topic[1])
# channel.queue_bind(exchange=en,queue=name2,routing_key=topic[2])

def callback(channel,method,properties,body):
    print(body)

with connection:
    tag1 = channel.basic_consume(name1,callback,True)  # 可以消费数据
    tag2 = channel.basic_consume(name2,callback,True)  # 可以消费数据
    channel.start_consuming()   # forever 不停止

print('receive meessage ok')
---------------------------------------------------------------------
C:\ProgramData\Miniconda3\envs\blog\python.exe C:/Users/dell/PycharmProjects/spiders/receive.py
b'phone.black 02'        # phone.*重复;
b'phone.black 02'
b'bike.black 05'
b'phone.black 07'
b'phone.black 07'
b'bike.black 08'

RPC 远程过程调用

RabbitMQ的RPC的应用场景较少,因为有更好的RPC通信框架。

9. 消息队列的作用

1、系统间解耦
2、解决生产者、消费者速度匹配
由于稍微上规模的项目都会分层、分模块开发,模块间或系统间尽量不要直接耦合,需要开放公共接口提供给别的
模块或系统调用,而调用可能触发并发问题,为了缓冲和解耦,往往采用中间件技术。

RabbitMQ只是消息中间件中的一种应用程序,也是较常用的消息中间件服务

相关文章

  • 64.1-RabbitMQ安装、管理和名词解释

    不要把幸福的标准定得太高,生命中的任何一件小事只要你细心品味过,可以说都与幸福有关。因为无论怎样,幸福都只是一种感...

  • 在centos建立你自己的Passive DNS收集系统

    目录 一 :名词解释 二:安装前的环境搭建 三:安装Bro 四:安装配置mysql 五: 运行Bro 一.名词解释...

  • web页面性能优化

    名词解释 APM:Application Performance Management,应用性能管理 blank:...

  • Linux操作系统学习笔记 0 ——目录

    环境配置和软件安装环境配置和软件安装地址 初识Linux——命令示例命令示例 目录管理目录管理 文件管理文件管理 ...

  • docker基础

    六、名词解释 安装docker https://docs.docker.com/engine/install/ce...

  • JAVA 开发环境搭建

    【JAVA 开发环境搭建】 一、安装JDK 1、JVM、JRE、JDK名词解释 2、安装JDK 官方下载地址:ht...

  • 阿里云日志服务接入

    名词解释 项目(Project) 日志服务中的资源管理单元,用于资源隔离和控制 日志空间(Logstore) 是日...

  • npm简单实用

    NPM(Node Package Manager)包管理和分发工具,下载、安装、上传、管理已安装的包1.安装、更新...

  • React Native入门教程

    环境安装 nvm管理node版本的shell工具,用来便捷安装和管理npm,node版本。安装地址 node 8....

  • 华春造价:总承包管理费—不说就忘的费用

    一、名词解释 总包管理费根据施工方的具体不同,分为“分包管理费”和“总包服务费”两种形式。具体如下: 1、分包管理...

网友评论

      本文标题:64.1-RabbitMQ安装、管理和名词解释

      本文链接:https://www.haomeiwen.com/subject/ruhjsktx.html