美文网首页
64.1-RabbitMQ安装、管理和名词解释

64.1-RabbitMQ安装、管理和名词解释

作者: BeautifulSoulpy | 来源:发表于2020-08-30 12:05 被阅读0次
    不要把幸福的标准定得太高,生命中的任何一件小事只要你细心品味过,可以说都与幸福有关。因为无论怎样,幸福都只是一种感觉而已!

    总结:

    1. 不要在缺省主机上加东西,
    2. pycharm同时运行一个py多次:选择:Edit Configurations勾线多线程跑程序;
    3. Python程序要解释器来解释和加载,解释后将源代码现场给你编译,编译之后读取字节码最后给你转换;你要起进程,最后给你起一个解释器进程;
      C/C++ 编译完成之后是个可执行文件/exe, 一旦被操作系统加载之后也就是个进程;
    4. 虚拟主机 test ; 交换机logs(记录日志) ;
    5. 本章关键: 虚拟主机 test ; 交换机logs(记录日志) ; 路由routing_key(orange\black\green); queue是name1\name2 ;
    6. 几种工作模式: routing_key: 全匹配 ; Topic: 模式匹配;
    7. 微服务 一般都是效率最高的;高并发的解决方案 本质都是一样的;
      参考:
      Python实现RabbitMQ中6种消息模型

    RabbitMQ
    RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健
    壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。很成熟,久经考验,应用广泛。
    文档详细,客户端丰富,几乎常用语言都有RabbitMQ的开发库;

    MQ全称为Message Queue, 消息队列(MQ)是应用程序“对”应用程序的通信方法。
    MQ:生产者往消息队列中投放消息,消费者可以读取队列中的消息。

    RabbitMQ 是一个消息代理。这主要的原理十分简单,就是通过接受和转发消息。你可以把它想象成邮局:当你将一个包裹送到邮局,你会相信邮递员先生最终会将邮件送到接件人手上。RabbitMQ就好比一个邮箱,邮局或邮递员。

    邮局和RabbitMQ两种主要的不同之处在于,RabbitMQ不处理文件,而是接受,并存储和以二进制形式将消息转发。

    1. RabbitMQ安装

    http://www.rabbitmq.com/install-rpm.html
    选择RPM包下载,选择对应平台,本次安装在CentOS 6.5,其它平台类似。

    文件传输 https://www.rabbitmq.com/install-rpm.html#downloads

    由于使用了erlang语言开发,所以需要erlang的包,该下载页给出了链接

    yum install socat-1.7.2.3-1.el6.x86_64.rpm erlang-20.1.7-1.el6.x86_64.rpm rabbitmq-server-3.7.0-1.el6.noarch.rpm
    

    安装成功,查看安装的文件;

    [dell@Centos7 ~]$ rpm -ql rabbitmq-server
    /etc/logrotate.d/rabbitmq-server
    /etc/profile.d/rabbitmqctl-autocomplete.sh
    /etc/rabbitmq
    /etc/rc.d/init.d/rabbitmq-server
    /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
    /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha
    /usr/lib/rabbitmq/autocomplete/bash_autocomplete.sh
    /usr/lib/rabbitmq/autocomplete/zsh_autocomplete.sh
    /usr/lib/rabbitmq/bin/cuttlefish
    /usr/lib/rabbitmq/bin/rabbitmq-defaults
    /usr/lib/rabbitmq/bin/rabbitmq-diagnostics
    /usr/lib/rabbitmq/bin/rabbitmq-env
    /usr/lib/rabbitmq/bin/rabbitmq-plugins
    /usr/lib/rabbitmq/bin/rabbitmq-server
    /usr/lib/rabbitmq/bin/rabbitmqctl
    ................
    

    2. 配置

    http://www.rabbitmq.com/configure.html#config-location

    2.1 环境变量

    使用系统环境变量,如果没有使用rabbitmq-env.conf 中定义环境变量,否则使用缺省值
    RABBITMQ_NODE_IP_ADDRESS the empty string, meaning that it should bind to all network interfaces.
    RABBITMQ_NODE_PORT 5672
    RABBITMQ_DIST_PORT RABBITMQ_NODE_PORT + 20000 内部节点和客户端工具通信用
    RABBITMQ_CONFIG_FILE 配置文件路径默认为/etc/rabbitmq/rabbitmq
    环境变量文件,可以不配置

    2.2 工作特性配置文件

    rabbitmq.config配置文件 (这个文件也可以不配置)
    3.7支持新旧两种配置文件格式
    1、erlang配置文件格式,为了兼容继续采用

    2、sysctl格式,如果不需要兼容,RabbitMQ鼓励使用

    3. 插件管理

    # 列出所有可用插件
    [root@Centos7 network-scripts]# rabbitmq-plugins list
    
    # 启动WEB管理插件
    [root@Centos7 network-scripts]# rabbitmq-plugins enable rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@Centos7...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    
    enabled 3 plugins.
    Offline change; changes will take effect at broker restart.
    
     systemctl enable rabbitmq-server  # 设置开机启动;
    
    # 启动服务
    [root@Centos7 rabbitmq]# systemctl start rabbitmq-server
    [root@Centos7 rabbitmq]# systemctl status rabbitmq-server         # 启动成功
    ● rabbitmq-server.service - LSB: Enable AMQP service provided by RabbitMQ broker
       Loaded: loaded (/etc/rc.d/init.d/rabbitmq-server; bad; vendor preset: disabled)
       Active: active (running) since Tue 2020-08-11 23:40:11 CST; 11s ago
         Docs: man:systemd-sysv-generator(8)
      Process: 112551 ExecStart=/etc/rc.d/init.d/rabbitmq-server start (code=exited, status=0/SUCCESS)
        Tasks: 93
       CGroup: /system.slice/rabbitmq-server.service
               ├─110399 /bin/sh /etc/rc.d/init.d/rabbitmq-server start
               ├─110404 /bin/bash -c ulimit -S -c 0 >/dev/null 2>&1 ; /usr/sbin/rabbitmq-server
               ├─110406 /sbin/runuser -u rabbitmq -- /usr/lib/rabbitmq/bin/rabbitmq-server
               ├─110426 /bin/sh /usr/lib/rabbitmq/bin/rabbitmq-server
               ├─110653 /usr/lib64/erlang/erts-9.1.5/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -B i -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.0/ebin -noshell -noinput -s rabbit boot -sname rabbit@Centos7 -boot...
    
               ├─110766 erl_child_setup 1024
               ├─110809 inet_gethost 4
               └─110821 inet_gethost 4
    
    Aug 11 23:39:46 Centos7 systemd[1]: Starting LSB: Enable AMQP service provided by RabbitMQ broker...
    Aug 11 23:39:46 Centos7 runuser[112636]: pam_unix(runuser:session): session opened for user rabbitmq by (uid=0)
    Aug 11 23:40:11 Centos7 rabbitmq-server[112551]: Starting rabbitmq-server: RabbitMQ is currently running
    Aug 11 23:40:11 Centos7 rabbitmq-server[112551]: rabbitmq-server.
    Aug 11 23:40:11 Centos7 systemd[1]: Started LSB: Enable AMQP service provided by RabbitMQ broker.
    
    
    [root@Centos7 dell]# ss -tanl | grep 5672
    LISTEN     0      128          *:15672                    *:*
    LISTEN     0      128          *:25672                    *:*
    LISTEN     0      128       [::]:5672                  [::]:*
    

    报错

    查看rabbitmq启动日志
    cat /var/log/rabbitmq/startup_log
    启动中,可能出现下面的错误 Error when reading /var/lib/rabbitmq/.erlang.cookie: eacces

    就是这个文件的权限问题,修改属主、属组即可


    开始登录WEB界面 [管理界面]http://192.168.0.100:15672/
    来宾 用户: guest/guest
    使用guest/guest只能本地地登录,远程登录会报错

    4. 用户管理

    [root@Centos7 dell]# rabbitmqctl
    Usage:
    rabbitmqctl [-n <node>] [-t <timeout>] [-l] [-q] <command> [<command options>]
    
    General options:
        -n node
        -q quiet
        -t timeout
        -l longnames
    Commands:
        add_user <username> <password>
        add_vhost <vhost>
        authenticate_user <username> <password>
        cancel_sync_queue [-p <vhost>] queue
        change_cluster_node_type <disc|ram>
        change_password <username> <password>
        clear_global_parameter <key>
        clear_operator_policy [-p <vhost>] <key>
        clear_parameter [-p <vhost>] <component_name> <key>
        clear_password <username>
        clear_permissions [-p vhost] <username>
        clear_policy [-p <vhost>] <key>
        clear_topic_permissions [-p vhost] <username> [<exchange>]
        clear_vhost_limits [-p <vhost>]
        close_all_connections [-p <vhost> --limit <limit>] [-n <node> --global] [--per-connection-delay <delay>] <explanation>
        close_connection <connectionpid> <explanation>
        set_user_tags
    
    
    
    添加用户:
    rabbitmqctl add_user username password
    删除用户:
    rabbitmqctl delete_user username
    更改密码:
    rabbitmqctl change_password username newpassword
    设置权限Tags,其实就是分配组
    rabbitmqctl set_user_tags username tag
    
    
    [root@Centos7 dell]# rabbitmqctl add_user wayne wayne         
    Adding user "wayne" ...
    [root@Centos7 dell]# rabbitmqctl set_user_tags wayne administrator
    Setting tags for user "wayne" to [administrator] ...
    
    

    tag的意义如下
    administrator 可以管理用户、权限、虚拟主机。

    5. 基本信息

    6. 虚拟主机

    7. Python库

    Pika是纯Python实现的支持AM QP协议的库

    pip install pika
    

    8. RabbitMQ工作原理及应用

    名词解释
    名词 说明
    Server 服务器。接收客户端连接,实现消息队列及路由功能的进程(实现AMQP实体服务),也称为 消息代理。注意,客户端包括生产者和消费者
    Connection 网络物理连接
    Channel 一个连接允许多个客户端连接
    Exchange 交换器。接收生产者发来的消息,决定如何路由给服务器中的队列。常用的类型有:direct(point-to-point)、topic (publish-subscribe)、fanout (multicast)
    Message 消息
    Message Queue 消息队列。数据的存储载体
    Bind 绑定。建立消息队列和交换器之间的关系,也就是说交换器拿到数据,把什么样的数据送给哪个队列
    Virtual Host 虚拟主机。一批交换机、消息队列和相关对象的集合。为了多用户互不干扰,使用虚拟主机分组交换机、消息队列
    Topic 主题、话题
    Broker 可等价为Server
    1. 队列*

    这种模式就是最简单的生产者消费者模型, 消息队列就是一个FIFO的队列

    1.消息产生者将消息放入队列
    2.消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

    simple简单模式

    生产者send.py,消费者receive.py
    官方例子 https://www.rabbitmq.com/tutorials/tutorial-one-python.html
    参照官方例子,写一个程序

    # send.py 生产者代码
    
    import pika
    
    # 配置连接参数
    params = pika.ConnectionParameters('192.168.0.100')
    # 建立连接
    connection = pika.BlockingConnection(params)
    
    with connection:
        # 建立通道
        channel = connection.channel()
        # 创建一个队列,queue命名为test。如果queue不存在,消息将被dropped
        channel.queue_declare('test')
    
        channel.basic_publish(
            exchange='',    # 使用缺省exchange
            routing_key='test',   # routing_key必须指定,这里要求和目标queue一致
            body='test 1'     # 消息
        )
    print('send meessage ok')
    #----------------------------------------------------------
    pika.exceptions.ProbableAuthenticationError: ConnectionClosedByBroker: (403) 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.'
    访问被拒绝,还是权限问题,原因还是guest用户只能访问localhost上的 / 缺省虚拟主机
    

    queue_declare声明一个queue,有必要的话,创建它。
    basic_publish exchange为空就使用缺省exchange,如果找不到指定的exchange,抛异常
    使用缺省exchange,就必须指定routing_key,使用它找到queue

    解决办法

    缺省虚拟主机,默认只能在本机访问,不要修改为远程访问,是安全的考虑。
    因此,在Admin中Virtual hosts中,新建一个虚拟主机test
    注意,新建的test虚拟主机的Users是谁,本次是wayne用户


    在ConnectionParameters中并没有用户名、密码填写的参数,它使用参数credentials传入,这需要构建一个pika.credentials.Credentials对象。

    修改代码如下:

    # send.py 生产者代码
    import pika
    
    # 配置连接参数
    cred = pika.PlainCredentials('wayne','wayne')
    params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
    # 建立连接
    connection = pika.BlockingConnection(params)
    
    with connection:
        # 建立通道
        channel = connection.channel()
        # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
        channel.queue_declare('hello')
    
        channel.basic_publish(
            exchange='',    # 使用缺省exchange
            routing_key='hello',   # routing_key必须指定,这里要求和目标queue一致
            body='test 1'     # 消息
        )
    print('send meessage ok')
    

    send 参数写法2: URLParameters , 也可以使用URL创建参数: %2F指代/ (/test),就是缺省

    # send.py 生产者代码
    import pika
    
    # 配置连接参数
    # cred = pika.PlainCredentials('wayne','wayne')
    # params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    
    # 建立连接
    connection = pika.BlockingConnection(params)
    
    with connection:
        # 建立通道
        channel = connection.channel()
        # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
        channel.queue_declare('hello')
    
        channel.basic_publish(
            exchange='',    # 使用缺省exchange
            routing_key='hello',  # routing_key必须指定,这里要求和目标queue一致
            body='test 1'     # 消息
        )
    print('send meessage ok')
    

    send.py 生产者多个消息 代码

    import pika
    
    # 配置连接参数
    # cred = pika.PlainCredentials('wayne','wayne')
    # params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    
    # 建立连接
    connection = pika.BlockingConnection(params)
    
    with connection:
        # 建立通道
        channel = connection.channel()
        # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
        channel.queue_declare('hello')
        for i in range(40):
            channel.basic_publish(
                exchange='',    # 使用缺省exchange
                routing_key='hello',  # routing_key必须指定,这里要求和目标queue一致
                body='test {}'.format(i)     # 消息
            )
    print('send meessage ok')
    

    receive.py 消费者代码
    单个消费消息

    # receive.py 消费者代码
    import pika
    
    # 配置连接参数
    # cred = pika.PlainCredentials('wayne','wayne')
    # params = pika.ConnectionParameters('192.168.0.100',5672,'test',credentials=cred)
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    
    # 建立连接
    connection = pika.BlockingConnection(params)
    
    with connection:
        # 建立通道
        channel = connection.channel()
        # 创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
        data = channel.basic_get('hello',True)
        print(data)
    
    print('receive meessage ok')
    #-----------------------------------------------------
    (<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=1', 'redelivered=False', 'routing_key=hello'])>, <BasicProperties>, b'test 1')
    返回元组:(方法method, 属性properties, 消息body)
    

    批量消费消息代码

    # receive.py 消费者代码
    import pika
    
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    # channel.queue_declare('hello')  # 创建一个hello;
    
    def callback(channel,method,properties,body):
        print(body)
    
    with connection:
        tag = channel.basic_consume('hello',callback,True)  # 可以消费数据
        channel.start_consuming()   # forever 不停止
    
    print('receive meessage ok')
    
    
    2、工作队列

    队列中的数据只能给一个消费者;拿走就没了;


    继续使用队列模式的生产者消费者代码,启动2个消费者。
    观察结果,可以看到,2个消费者是交替拿到不同的消息。
    这种工作模式是一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它。
    从结果知道,使用的是轮询方式拿走数据的

    注意:虽然上面的图中没有画出exchange,用到缺省exchange。

    3、发布、订阅模式 exchange

    Publish/Subscribe发布和订阅, 想象一下订阅报纸, 所有订阅者(消费者) 订阅这个报纸(消息) , 都应该拿到
    一份同样内容的报纸。
    订阅者和消费者之间还有一个exchange, 可以想象成邮局, 消费者去邮局订阅报纸, 报社发报纸到邮局, 邮局决
    定如何投递到消费者手中。

    上例中工作队列模式的使用,相当于,每个人只能拿到不同的报纸。所以,不适用发布订阅模式。


    当前模式的exchange的type是fanout, 就是一对多, 即广播模式
    注意, 同一个queue的消息只能被消费一次, 所以, 这里使用了多个queue, 相当于为了保证不同的消费者拿到同
    样的数据, 每一个消费者都应该有自己的queue。
    # 生成一个交换机
    channel.exchange_declare(
        exchange='logs', # 新交换机 logs  记录日志的
        exchange_type='fanout' # 广播,扇出;
    )
    

    生产者使用广播模式。在test虚拟主机主机下构建了一个logs交换机
    至于queue,可以由生产者创建,也可以由消费者创建。
    本次采用使用消费者端创建,生产者把数据发往交换机logs,采用了fanout,然后将数据通过交换机发往已经绑定
    到此交换机的所有queue。

    # 消费者端
    result = channel.queue_declare() # 生成一个随机名称的queue
    result = channel.queue_declare(exclusive=True) # 生成一个随机名称的queue,并在断开连接时删除queue
    # 生成queue
    q1 = channel.queue_declare(exclusive=True)
    q2 = channel.queue_declare(exclusive=True)
    q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称
    q2name = q2.method.queue
    print(q1name, q2name)
    # 绑定
    channel.queue_bind(exchange='logs', queue=q1name)
    channel.queue_bind(exchange='logs', queue=q2name)
    

    生产者代码——注意观察 交换机和队列

    # send.py 生产者代码
    import pika
    
    en = 'log'
    # 配置连接参数
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    connection = pika.BlockingConnection(params)
    
    channel = connection.channel()
    channel.exchange_declare(
        en,
        'fanout'
    )
    
    with connection:
        for i in range(50):
            channel.basic_publish(
                exchange=en,
                routing_key='',  
                body='test {:02}'.format(i)     # 消息
            )
    print('send meessage ok')
    

    多了一个虚拟主机 logs(有流量通信), hello也不变,根本不存在queue(容器);

    多了一个虚拟主机 logs
    特别注意:如果先开启生产者,由于没有队列queue,请观察数据会怎样呢?
    no binding

    消费者代码
    构建queue并绑定到test虚拟主机的logs交换机上

    # receive.py 消费者代码
    import pika
    
    en = 'log'
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    connection = pika.BlockingConnection(params)
    
    channel = connection.channel()  # 建立rabbit协议的通道
    channel.exchange_declare(
        en,
        'fanout'
    )
    # fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
    # direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
    # topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)
    
    
    # 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
    q1 = channel.queue_declare('',exclusive=True)
    q2 = channel.queue_declare('',exclusive=True)
    name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
    name2 = q2.method.queue
    
    # binding 告诉exchange将message发送该哪些queue
    channel.queue_bind(exchange=en,queue=name1)
    channel.queue_bind(exchange=en,queue=name2)
    
    def callback(channel,method,properties,body):
        print(body)
    
    with connection:
        tag1 = channel.basic_consume(name1,callback,True)  # 可以消费数据
        tag2 = channel.basic_consume(name2,callback,True)  # 可以消费数据
        channel.start_consuming()   # forever 不停止
    
    print('receive meessage ok')
    
    
    binding效果

    尝试先启动生产者,再启动消费者试试看。
    部分数据丢失, 因为, exchange收到了数据, 没有queue接收, 所以, exchange丢弃了这些数据;
    所以要先运行 消费者,后运行 生产者 就有数据了;

    4、路由Routing 模式***

    路由其实就是生成者的数据经过exchange的时候,通过匹配规则,决定数据的去向

    生产者代码 : 交换机类型为direct,指定路由的key

    # send.py 生产者代码
    # send.py 生产者代码
    import pika,random
    
    en = 'color'
    colors = ('orange','black','green')
    
    # 配置连接参数
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    connection = pika.BlockingConnection(params)
    
    
    channel = connection.channel()
    channel.exchange_declare(
        exchange=en,  # 交换机
        exchange_type='direct'  # 路由
    )
    
    with connection:
        for i in range(10):
            rk = '{}'.format(colors[random.randint(0,2)])
            channel.basic_publish(
                exchange=en,
                routing_key=rk,  # routing_key必须指定,这里要求和目标queue一致
                body='test {:02}'.format(i)     # 消息
            )
    print('send meessage ok')
    
    
    

    消费者代码

    # receive.py 消费者代码
    # receive.py 消费者代码
    import pika
    
    en = 'color'
    colors = ('orange','black','green')
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    connection = pika.BlockingConnection(params)
    
    channel = connection.channel()  # 建立rabbit协议的通道
    channel.exchange_declare(
        en,
        'direct'
    )
    # fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
    # direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
    # topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)
    
    
    # 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
    q1 = channel.queue_declare('',exclusive=True)
    q2 = channel.queue_declare('',exclusive=True)
    name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
    name2 = q2.method.queue
    
    # binding 告诉exchange将message发送该哪些queue
    channel.queue_bind(exchange=en,queue=name1,routing_key=colors[0])
    channel.queue_bind(exchange=en,queue=name2,routing_key=colors[1])
    channel.queue_bind(exchange=en,queue=name2,routing_key=colors[2])
    
    def callback(channel,method,properties,body):
        print(body)
    
    with connection:
        tag1 = channel.basic_consume(name1,callback,True)  # 可以消费数据
        tag2 = channel.basic_consume(name2,callback,True)  # 可以消费数据
        channel.start_consuming()   # forever 不停止
    
    print('receive meessage ok')
    
    
    color交换机
    思考 : 如果routing_key设置的都一样,会怎么样?

    绑定的时候指定的routing_key='black',如上图,和fanout就类似了,都是1对多,但是不同。
    因为fanout时,exchange不做数据过滤的,1个消息,所有绑定的queue都会拿到一个副本。
    direct时候,要按照routing_key分配数据,上图的black有2个queue设置了,就会把1个消息分发给这2个queue(1变2)。

    5. Topic 话题

    Topic就是更加高级的路由,支持模式匹配而已
    Topic的routing_key必须使用 . 点号分割的单词组成。最多255个字节。
    支持使用通配符:

    * 表示严格的一个单词
    # 表示0个或者多个单词

    如果queue绑定的routing_key只是一个#, 这个queue其实可以接收所有的消息。

    如果没有使用任何通配符, 效果类似于direct

    生产者代码

    # send.py 生产者代码
    import pika,random
    
    en = 'products'
    products = ('phone','tv','bike')
    colors = ('orange','black','green')
    
    topic = ('phone.*','*.black')
    
    # 配置连接参数
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    connection = pika.BlockingConnection(params)
    
    
    channel = connection.channel()
    channel.exchange_declare(
        exchange=en,  # 交换机
        exchange_type='topic'  # 路由
    )
    
    with connection:
        for i in range(10):
            rk = '{}.{}'.format(products[random.randint(0,2)],colors[random.randint(0,2)])
            channel.basic_publish(
                exchange=en,
                routing_key=rk,  # routing_key必须指定,这里要求和目标queue一致
                body='{} {:02}'.format(rk,i)     # 消息
            )
    print('send meessage ok')
    

    消费者代码

    # receive.py 消费者代码
    import pika
    
    en = 'products'
    colors = ('orange','black','green')
    products = ('phone','tv','bike')
    topic = ('phone.*','*.black')
    
    
    params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
    connection = pika.BlockingConnection(params)
    
    channel = connection.channel()  # 建立rabbit协议的通道
    channel.exchange_declare(
        en,
        'topic'
    )
    # fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
    # direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
    # topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)
    
    
    # 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
    q1 = channel.queue_declare('',exclusive=True)
    q2 = channel.queue_declare('',exclusive=True)
    name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
    name2 = q2.method.queue
    
    # binding 告诉exchange将message发送该哪些queue
    channel.queue_bind(exchange=en,queue=name1,routing_key=topic[0])
    channel.queue_bind(exchange=en,queue=name2,routing_key=topic[1])
    # channel.queue_bind(exchange=en,queue=name2,routing_key=topic[2])
    
    def callback(channel,method,properties,body):
        print(body)
    
    with connection:
        tag1 = channel.basic_consume(name1,callback,True)  # 可以消费数据
        tag2 = channel.basic_consume(name2,callback,True)  # 可以消费数据
        channel.start_consuming()   # forever 不停止
    
    print('receive meessage ok')
    ---------------------------------------------------------------------
    C:\ProgramData\Miniconda3\envs\blog\python.exe C:/Users/dell/PycharmProjects/spiders/receive.py
    b'phone.black 02'        # phone.*重复;
    b'phone.black 02'
    b'bike.black 05'
    b'phone.black 07'
    b'phone.black 07'
    b'bike.black 08'
    
    
    RPC 远程过程调用

    RabbitMQ的RPC的应用场景较少,因为有更好的RPC通信框架。

    9. 消息队列的作用

    1、系统间解耦
    2、解决生产者、消费者速度匹配
    由于稍微上规模的项目都会分层、分模块开发,模块间或系统间尽量不要直接耦合,需要开放公共接口提供给别的
    模块或系统调用,而调用可能触发并发问题,为了缓冲和解耦,往往采用中间件技术。

    RabbitMQ只是消息中间件中的一种应用程序,也是较常用的消息中间件服务

    相关文章

      网友评论

          本文标题:64.1-RabbitMQ安装、管理和名词解释

          本文链接:https://www.haomeiwen.com/subject/ruhjsktx.html