美文网首页
3. 消息队列介绍和RabbitMQ

3. 消息队列介绍和RabbitMQ

作者: 随便写写咯 | 来源:发表于2021-06-28 03:23 被阅读0次

1. 消息队列介绍

1.1 MQ定义

消息队列的目的是为了实现各个应用程序之间的通讯, 应用程序基于MQ实现消息的发送和接收, 实现应用程序之间的通讯
这样多个应用程序可以运行在不同的主机上, 通过MQ就可以实现跨网络的通信, 因此MQ实现了业务的解耦和异步机制

1.2 MQ使用场合

消息队列作为高并发系统的核心组件之一, 能够帮助业务系统结构提升开发效率和系统稳定性, 消息队列主要具有以下特点:

1. 削峰填谷: 主要解决瞬时写压力大于应用程序处理能力, 导致消息丢失, 系统崩溃等问题

案例: 当应用程序处理能力低, 而瞬时并发写请求过多时, 客户端请求就会出现排队, 并且很可能断开连接, 造成信息丢失. 
解决方案: 通过消息队列去存储用户请求, 利用生产者/消费者模型, 让生产者把消息写入到MQ, 由消费者去MQ消费消息, 处理请求. 
暂时没有被消费的请求, 就在MQ中排队, 做临时存储
2. 系统解耦: 解决不同重要程度, 不同能力级别系统之间依赖, 避免一死全死

案例: 按照不同的功能, 把一个服务, 拆分成多个系统, 比如登录, 查询, 支付等. 这样, 当某一个系统服务崩溃, 不会影响其他的服务. 
比如, 当支付系统崩溃时, 查询系统并不会受到影响
3. 蓄流压测: 线上有些链路不好压测, 可以通过在MQ中堆积一定量消息, 再放开来压测

案例: 通过开发应用程序模拟用户请求, 把请求全部先写入到MQ, 之后再开启后端服务, 来检查后端服务压力处理能力

2 RabbitMQ

2.1 RabbitMQ简介

RabbitMQ 采用 Erlang 语言开发,Erlang 语言由 Ericson 设计,Erlang 在分布式编
程和故障恢复方面表现出色,电信领域被广泛使用。

RabbitMQ架构

图片.png
  • Broker: 接收和分发消息的应用,一个RabbitMQ Server 就是 Message Broker。
  • Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚
    拟的分组中,类似于网络中的 namespace 概念,当多个不同的用户使用同一个
    RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost
    创建 exchange/queue 等。通常每个业务会有一套RabbitMQ服务器, 因此, 一般RabbitMQ不会混用. 即使多套业务使用同一个RabbitMQ, 只要队列不冲突即可
  • Connection: publisher/consumer 和 broker 之间的 TCP 连接。
  • Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候
    建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection
    内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的
    channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker
    识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection
    极大减少了操作系统建立 TCP connection 的开销。
  • Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing
    key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish�subscribe) and fanout (multicast)。
  • Queue: 消息最终被送到这里等待 consumer 取走。
  • Binding: exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。
    Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

RabbitMQ 优势
基于 erlang 语言开发,具有高并发优点、支持分布式
具有消息确认机制、消息持久化机制,消息可靠性和集群可靠性高
简单易用、运行稳定、跨平台、多语言
开源
Queue 的特性
消息基于先进先出的原则进行顺序消费
消息可以持久化到磁盘节点服务器
消息可以缓存到内存节点服务器提高性能

2.2 RabbitMQ中的生产者消费者示例

图片.png

生产者发送消息到 broker server(RabbitMQ),在 Broker 内部,用户创建Exchange/Queue,通过 Binding 规则将两者联系在一起,Exchange 分发消息,根据类型/binding 的不同分发策略有区别,消息最后来到 Queue 中,等待消费者取走。

2.3 数据的存储

RabbitMQ支持内存节点和磁盘节点, 区别就是内存节点的吞吐量高, 而磁盘节点可以保证数据的安全性. 一般建议至少两个节点, 一个内存节负责正常的数据读取, 而磁盘节点从内存节点同步数据, 一旦内存节点宕机, 还可以通过磁盘节点进行数据的还原

3 RabbitMQ部署

3.1 RabbitMQ 单机部署

注意: RabbitMQ由Erlang语言编写, 因此, 不同版本之间要一对一对应, 否则会造成RabbitMQ无法安装

RabbitMQ官网
  • 小型业务, 测试环境: 4C-8G-60G
  • 大型业务: 16C-32G-60G+

Ubuntu1804安装单机版RabbitMQ, 3.8.18

服务器环境

10.0.0.239 
hostname: mq1.node

3.1.1 更新apt源

[root@mq1:~]# apt update

3.1.2 官方提供安装脚本

#!/bin/sh

sudo apt-get install curl gnupg debian-keyring debian-archive-keyring apt-transport-https -y

## Team RabbitMQ's main signing key
sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"
## Launchpad PPA that provides modern Erlang releases
sudo apt-key adv --keyserver "keyserver.ubuntu.com" --recv-keys "F77F1EDA57EBB1CC"
## PackageCloud RabbitMQ repository
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | sudo apt-key add -

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main

## Provides RabbitMQ
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

3.1.3 查看RabbitMQ可用版本

[root@mq1:~]# apt-cache madison rabbitmq-server

3.1.4 安装RabbitMQ, 3.8.18最新版

apt -y install rabbitmq-server

3.1.5 RabbitMQ启动

systemctl start rabbitmq-server
systemctl enable --now rabbitmq-server

3.1.6 RabbitMQ监听端口

5672: 消费者访问的端口
15672: web管理端口
25672: 集群状态通信端口

3.2 RabbitMQ插件管理

3.2.1 开启web界面管理插件

rabbitmq-plugins enable rabbitmq_management

3.2.2 登录web管理界面

注意: rabbitmq从3.3.0开始, 禁止使用guest/guest权限通过除localhost外的访问, 直接访问报错如下

image.png

创建允许远程登录用户, 创建后开发测试人员即可用这个账号在代码中远程连接RabbitMQ

# create a user
rabbitmqctl add_user admin admin
# tag the user with "administrator" for full management UI and HTTP API access
rabbitmqctl set_user_tags admin administrator

再次登录

image.png

给admin管理员添加管理vhost权限

image.png image.png image.png

3.2.3 RabbitMQ生产者和消费者测试

生成者测试

利用Python脚本批量写入消息

#coding:utf-8
import pika
#RabbitMQ用户名密码
cert = pika.PlainCredentials("admin","admin")
#连接到服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.239",5672,"/",cert))
#创建频道
channel = conn.channel()
#声明消息队列,如果不存在就创建, 存在就将消息在此队列中创建, 如果将消息发送到不存在的队列, 则RabbitMQ会自动清除这些消息
channel.queue_declare(queue="hello")
#exchange告诉消息去往的队列, routing key是队列名, body是要传递的消息内容
for i in range(100000): #通过循环写入10万条消息
    # num = "%s" % i
    channel.basic_publish(exchange="",
                          routing_key="hello",
                          body="hello world %d!" % i)
    print("消息%s写入成功!" % i)
#消息写入完成,关闭连接
conn.close()

消费者测试

利用Python脚本批量消费信息

#coding:utf-8
import pika
#RabbitMQ用户名密码
cert = pika.PlainCredentials("admin","admin")
#连接到服务器
conn = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.239",5672,"/",cert))
#创建频道
channel = conn.channel()
#声明消息队列,如果不存在就创建
channel.queue_declare(queue="hello")
#定义一个回调函数, 将信息打印出来
def callback(ch,method,properties,body):
        print("[x] Received %r" % body)
channel.basic_consume("hello",callback,
                      auto_ack=False,
                      exclusive=False,
                      consumer_tag=None,
                      arguments=None)
print(" [*] Waiting for messages. To exit press CTRL+C")
#开始接收消息, 并进入阻塞状态, 队列里有信息才会调用callback进行处理, 按ctrl+c退出
channel.start_consuming()

还原虚拟机, 准备集群搭建

如果处理消息剩余数量过多?

由于下游服务器性能问题, 有时会导致有很多剩余数量积攒, 无法被消费, 此时需要添加下游节点,消费者服务器, 连接到消息队列, 把消息尽快消费掉, 否则用户访问等待时间会过长

3.3 RabbitMQ 集群部署

RabbitMQ集群分为二种方式:

  • 普通模式: 创建好RabbitMQ集群之后的默认模式, 数据不会主动拷贝到其他节点, 当消费者访问的服务器没有需要的数据时, 当前服务器会从存有需要的数据的节点拉取数据, 之后返回给用户. 此种方式涉及数据的临时拷贝, 会影响性能. 此外, 如果存放数据的节点宕机, 数据就会无法访问
  • 镜像模式: 把需要的队列做成镜像队列, 消息会在每个服务器之间复制. 和MySQL主从方式一样. 消息写到某个mq服务器后, 会立即同步到其他服务器

普通集群模式:

queue 创建之后,如果没有其它 policy,消息实体只存在于其中一个节点
A、B 两个 RabbitMQ 节点仅有相同的元数据,即队列结构,但队列的数据仅保存有一份
即创建该队列的 rabbitmq 节点(A 节点),当消息进入 A 节点的 Queue 中后,如果consumer 从 B 节点拉取RabbitMQ 会临时在 A、B 间进行
消息传输,把 A 中的消息实体取出并经过 B 发送给 consumer,所以 consumer 可以连接每一个节点,从中取消息
该模式存在一个问题就是当 A 节点故障后,B节点无法取到 A 节点中还未消费的消息实体。

镜像集群模式:

把需要的队列做成镜像队列,存在于多个节点,属于 RabbitMQ 的 HA 方案(镜像模式是在普通模式的基础上,增加一些镜像策略)
该模式解决了普通模式中的数据丢失问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步
而不是在 consumer 取数据时临时拉取,该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之
大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉,所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置 policy,
然后客户端创建队列的时候,rabbitmq 集群根据“队列名称”自动设置是普通集群模式或镜像队列。

RabbitMQ集群两种节点类型:

  • 内存节点: 只将数据保存到内存

  • 磁盘节点: 保存数据到内存和磁盘

内存节点虽然不写入磁盘, 但是它执行比磁盘节点要好, 集群中, 只需要一个磁盘节点来保存数据就足够了, 如果集群中只有内存节点, 那么不能全部停止它们, 否则所有数据消息在服务器全部停机之后都会丢失. 需要分批次停止服务器, 比如, 先重启一个服务器, 重启后该服务器上的数据丢失, 但是服务启动后会从另一个服务器将数据拷贝到本机, 然后再重启另一个服务器, 重启后, 数据也会被拷贝到第二个服务器, 这样实现重启不丢失数据

推荐设计架构:

在一个RabbitMQ集群里, 有三台或以上机器, 其中一台使用磁盘模式, 其他节点使用内存模式, 内存节点访问速度更快, 由于磁盘IO相对较慢, 磁盘节点可作为数据备份使用.

Ubuntu1804部署集群版RabbitMQ

实验环境

10.0.0.239 mq1.node
10.0.0.229 mq2.node
10.0.0.219 mq3.node
mq1和mq2作为内存节点, 添加到mq3, 以mq3作为集群名称

3.3.1 三台服务器分别配置主机名解析

RabbitMQ只会识别主机名, 不会识别域名

注意: RabbitMQ集群的主机名不能随意修改, 否则会导致无法解析, RabbitMQ无法启动

vim /etc/hosts
10.0.0.239 mq1 mq1.node
10.0.0.229 mq2 mq2.node
10.0.0.219 mq3 mq3.node 

3.3.2 各服务器安装RabbitMQ-3.8.18

参考单机部署

安装rabbitmq-server最新版

#!/bin/sh

sudo apt-get install curl gnupg debian-keyring debian-archive-keyring apt-transport-https -y

## Team RabbitMQ's main signing key
sudo apt-key adv --keyserver "hkps://keys.openpgp.org" --recv-keys "0x0A9AF2115F4687BD29803A206B73A36E6026DFCA"
## Launchpad PPA that provides modern Erlang releases
sudo apt-key adv --keyserver "keyserver.ubuntu.com" --recv-keys "F77F1EDA57EBB1CC"
## PackageCloud RabbitMQ repository
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | sudo apt-key add -

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main

## Provides RabbitMQ
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
apt install rabbitmq-server -y

3.3.3 启动rabbitmq-server

systemctl start rabbitmq-server
systemctl enable --now rabbitmq-server

3.3.4 同步.erlang.cookie文件

RabbitMQ的集群是依赖于Erlang的集群来工作的, 所以必须先构建起Erlang的集群环境, 而Erlang的集群中,各节点是通过一个magic cookie来实现的, 如果是apt安装的RabbitMQ, 这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中, 文件是400权限, 所以必须保证各节点cookie保持一致, 否则节点之间就无法通信

cookie文件会在安装RabbitMQ是自动创建, 但是cookie内容是随机的, 因此要确保集群之间一致, 可以关闭所有RabbitMQ服务后, 将cookie文件从一个节点, 拷贝到其余节点, 实现统一

root@mq1:~# cat /var/lib/rabbitmq/.erlang.cookie
AVWTQKJALSKIEEILTHBS

root@mq2:~# cat /var/lib/rabbitmq/.erlang.cookie
TLJEYYIRVHOGQKJPLNSN

root@mq3:~# cat /var/lib/rabbitmq/.erlang.cookie
YFPNURCLZZGXBEFPTSJH

各服务器关闭RabbitMQ

systemctl stop rabbitmq-server

将cookie文件, 从mq1-10.0.0.239,同步到其他两个节点

scp /var/lib/rabbitmq/.erlang.cookie 10.0.0.229:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie 10.0.0.219:/var/lib/rabbitmq/.erlang.cookie

各服务器启动RabbitMQ

systemctl start rabbitmq-server

3.3.5 查看当前集群状态

在未创建集群时, 每个mq都是单机的状态, 而且默认都是磁盘节点

rabbitmqctl cluster_status
Disk Nodes

rabbit@mq1

Disk Nodes

rabbit@mq2

Disk Nodes

rabbit@mq3

3.3.6 创建RabbitMQ集群

  • 集群可以在任意mq节点创建, 之后将其余节点添加到本集群
  • 创建集群前, 每个节点最好是空的环境
  • 创建集群前, 要停止节点的app服务, 清空元数据

实例: 将mq1, mq2 添加到mq3节点作为内存节点, 停止mq1和mq2上的app服务, 清空元数据, 让数据从mq3进行同步, mq3作为磁盘节点

将mq1作为内存节点, 添加到mq3

[root@mq1:~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@mq1 ...
[root@mq1:~]# rabbitmqctl reset # 清空当前服务器元数据
Resetting node rabbit@mq1 ...
[root@mq1:~]# rabbitmqctl join_cluster rabbit@mq3 --ram #将mq1添加到集群当中, 并成为内存节点, 不加--ram默认是磁盘节点, rabbit是协议, mq3是主机名, 不包含域名
Clustering node rabbit@mq1 with rabbit@mq3
[root@mq1:~]# rabbitmqctl start_app
Starting node rabbit@mq1 ...

将mq2作为内存节点, 添加到mq3

[root@mq2:~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@mq2 ...
[root@mq2:~]# rabbitmqctl reset
Resetting node rabbit@mq2 ...
[root@mq2:~]# rabbitmqctl join_cluster rabbit@mq3 --ram #将mq2添加到集群当中, 并成为内存节点, 不加--ram默认是磁盘节点
Clustering node rabbit@mq2 with rabbit@mq3
[root@mq2:~]# rabbitmqctl start_app
Starting node rabbit@mq2 ...

3.3.7 验证集群状态

  • 创建集群后, 每台节点上的集群状态都是相同的
  • 默认创建的集群是普通模式, 数据不会在各个节点之间同步, 而是当有需要时, 从有数据的节点拉取
root@mq1:~# rabbitmqctl cluster_status
Cluster status of node rabbit@mq1 ...
Basics

Cluster name: rabbit@mq1

Disk Nodes

rabbit@mq3

RAM Nodes

rabbit@mq1
rabbit@mq2

Running Nodes

rabbit@mq1
rabbit@mq2
rabbit@mq3

Versions

rabbit@mq1: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq2: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq3: RabbitMQ 3.8.18 on Erlang 24.0.2

Maintenance status

Node: rabbit@mq1, status: not under maintenance
Node: rabbit@mq2, status: not under maintenance
Node: rabbit@mq3, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Feature flags

Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled

root@mq2:~# rabbitmqctl cluster_status
Cluster status of node rabbit@mq2 ...
Basics

Cluster name: rabbit@mq2

Disk Nodes

rabbit@mq3

RAM Nodes

rabbit@mq1
rabbit@mq2

Running Nodes

rabbit@mq1
rabbit@mq2
rabbit@mq3

Versions

rabbit@mq1: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq2: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq3: RabbitMQ 3.8.18 on Erlang 24.0.2

Maintenance status

Node: rabbit@mq1, status: not under maintenance
Node: rabbit@mq2, status: not under maintenance
Node: rabbit@mq3, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Feature flags

Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled

root@mq3:~# rabbitmqctl cluster_status
Cluster status of node rabbit@mq3 ...
Basics

Cluster name: rabbit@mq3

Disk Nodes

rabbit@mq3

RAM Nodes

rabbit@mq1
rabbit@mq2

Running Nodes

rabbit@mq1
rabbit@mq2
rabbit@mq3

Versions

rabbit@mq1: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq2: RabbitMQ 3.8.18 on Erlang 24.0.2
rabbit@mq3: RabbitMQ 3.8.18 on Erlang 24.0.2

Maintenance status

Node: rabbit@mq1, status: not under maintenance
Node: rabbit@mq2, status: not under maintenance
Node: rabbit@mq3, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Feature flags

Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled

3.3.8 将集群设置为镜像模式

只需在任意节点执行以下命令

[root@mq1:~]# rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}'
Setting policy "ha-all" for pattern "#" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
  • 设置为镜像模式后, 在任意节点写入数据, 都会主动同步到其余的节点
  • 为了提高效率和可用性, 可以在ram节点前加负载均衡, 让用户的请求发往ram节点, disk节点只用来保存数据

3.3.9 通过web界面验证集群状态

不启动web插件的rabbitmq服务器, 会在web节点提示节点统计信息不可用

先在mq1启动插件

[root@mq1:~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@mq1:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq1...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.
image.png

创建管理用户

  • 只需要在一个节点创建即可, 创建后会同步到其余节点
[root@mq1:~]# rabbitmqctl add_user admin admin
Adding user "admin" ...
[root@mq1:~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

在其余节点启用web管理插件

[root@mq2:~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@mq2:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq2...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.
root@mq3:~# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@mq3:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq3...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.
image.png

测试在其余两个节点, 可以使用admin账户登录

image.png
image.png

3.3.10 集群测试

生产者在mq3写入数据, 消费者分别从mq1和mq2消费数据

  1. 在10.0.0.219写入数据, 在10.0.0.229查看
图片.png
  1. 在10.0.0.239和10.0.0.229同时消费数据
图片.png 图片.png

3.4 RabbitMQ常用命令

创建vhost

[root@mq1:~]# rabbitmqctl add_vhost rabbit
Adding vhost "rabbit" ...

列出所有vhosts

[root@mq1:~]# rabbitmqctl list_vhosts
Listing vhosts ...
name
admin_host
rabbit
/

列出所有队列

[root@mq1:~]# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

删除指定vhost

[root@mq1:~]# rabbitmqctl delete_vhost admin_host
Deleting vhost "admin_host" ...

添加rabbitmq账户, 设置其密码为123456

[root@mq1:~]# rabbitmqctl add_user rabbitmq 123456
Adding user "rabbitmq" ...

更改rabbitmq用户密码为111111

[root@mq1:~]# rabbitmqctl change_password rabbitmq 111111
Changing password for user "rabbitmq" ...

设置rabbitmq用户对rabbit的vhost有读写权限, 配置正则, 读正则和写正则

[root@mq1:~]# rabbitmqctl set_permissions -p rabbit rabbitmq ".*" ".*" ".*"
Setting permissions for user "rabbitmq" in vhost "rabbit" ...

3.5 RabbitMQ集群监控

3.5.1 集群状态监控

#!/usr/bin/python3                                                                                                                               
#coding:utf-8

import subprocess
running_list = []
error_list = []
false = "false"
true = "true"

def get_status():
    obj = subprocess.Popen(("curl -s -u guest:guest http://localhost:15672/api/nodes &> /dev/null"), shell=True,stdout=subprocess.PIPE)
    data = obj.stdout.read()
    data1 = eval(data)
    for i in data1:
         if i.get("running") == "true":
            running_list.append(i.get("name"))
         else:
            error_list.append(i.get("name"))

def count_server():
    if len(running_list) < 3: # 可以判断错误列表大于 0 或者运行列表小于 3,3为总计的节点数量
         print(100) # 100 就是集群内有节点运行不正常了
    else:
         print(50) # 50 为所有节点全部运行正常

def main():
    get_status()
    count_server()

if __name__ == "__main__":
    main()

3.5.2 内存使用监控

vim rabbitmq_memory.py

#!/usr/bin/python3                                                                                                                             
#coding:utf-8

import subprocess
import sys

running_list = []
error_list = []
false = "false"
true = "true"

def get_status():
    obj = subprocess.Popen(("curl -s -u guest:guest http://localhost:15672/api/nodes &> /dev/null"), shell=True,stdout=subprocess.PIPE)
    data = obj.stdout.read()
    data1 = eval(data)
    #print(data1)
    for i in data1:
        if i.get("name") == sys.argv[1]:
            print(i.get("mem_used"))

def main():
    get_status()

if __name__ == "__main__":
    main()

[root@mq2:~]# python3 rabbitmq_memory.py rabbit@mq1
87326720
[root@mq2:~]# python3 rabbitmq_memory.py rabbit@mq2
86876160
[root@mq2:~]# python3 rabbitmq_memory.py rabbit@mq3
89997312

3.5.3 RabbitMQ API

https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_9/priv/www/api/index.html

API使用举例

[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/nodes
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/overview
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/cluster-name
{"name":"rabbit@mq3"}
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.219:15672/api/healthchecks/node
{"status":"ok"} #如果节点正常, 会返回status:ok
[root@mq1:~]# curl -s -u admin:admin http://10.0.0.229:15672/api/healthchecks/node #如果节点异常, 不会返回数据, 可以通过返回值, 利用zabbix进行监控

root@mq_src:~# apt -y install autoconf make gcc socat ncurses-dev

相关文章

  • 3. 消息队列介绍和RabbitMQ

    1. 消息队列介绍 1.1 MQ定义 1.2 MQ使用场合 消息队列作为高并发系统的核心组件之一, 能够帮助业务系...

  • RabbitMQ笔记

    1、消息队列 在介绍RabbitMQ之前先介绍一下消息队列,即MQ,(Message Queue) 消息队列是典型...

  • 消息队列对比

    引用: 常用消息队列对比消息队列及常见消息队列介绍 常用消息队列 1. RabbitMQ 用erlang语言开发的...

  • RabbitMQ博客列表

    消息队列和RabbitMQ及AMQP协议介绍RabbitMQ安装及其基本命令四种类型ExchangeBinding...

  • rabbitmq消息队列介绍

    一、定义 rabbitmq是建立在AMQP上的企业消息系统。以生产者消费者为模型而存在的一个消息队列. 二、作用 ...

  • 消息队列探秘-RabbitMQ消息队列介绍

    1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...

  • 80 rabbitmq

    Rabbitmq 基本介绍 RabbitMQ是基本实现了赶集消息队列协议(AMQP)的开源消息代理软件(也称作面向...

  • RabbitMQ 研究

    1 、介绍 1.1 RabbitMQ MQ全称为Message Queue,即消息队列, RabbitMQ是由er...

  • Redis作为消息队列与RabbitMQ的比较

    Redis作为消息队列与RabbitMQ的比较 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议...

  • 消息队列——了解一下

    本文将从以下几点展开 为什么使用消息队列? 消息队列的缺点 消息队列如何选型 RabbitMQ RabbitMQ ...

网友评论

      本文标题:3. 消息队列介绍和RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/djqfultx.html