美文网首页
RabbitMQ学习

RabbitMQ学习

作者: 什锦小沐 | 来源:发表于2022-06-04 21:41 被阅读0次

    前言

        通过在慕课网学习,进行RabbitMQ的了解、安装、和SpringBoot的整合使用。

    一、RabbitMQ简介及AMQP协议

    1.1 初识RabbitMQ

    初识RabbitMQ

    1.目前大多数互联网都在使用RabbitMQ

    2.RabbitMQ底层采用Erlang语言进行编写(需要Erlang环境,Erlang语言在交换机领域非常受欢迎的,复制数据低延迟)

    3.开源、性能优秀,稳定

    4.与SpringAMQP完美的整合、API丰富

    5.集群模式丰富,表达式配置,HA模式,镜像队列模型(主从模式、远程模式、双活模式、镜像队列模式:保证数据的不丢失,完善的架构:HAproxy实现负载均衡,上层再加keepalive保证高可用)

    6.保证数据不丢失的前提下做到高可靠性、可用性

    7.AMQP(Advanced Message Queuing Protocol,高级消息队列协议)

    AMQP协议模型

    Publisher application:生产者应用

    Consumer application:消费者应用

    Server:MQ服务,即RabbitMQ

    Virtual host:虚拟主机

    Exchange:交换机,和消息队列之间有绑定关系(Route Key)

    Message Queue:消息队列

    二、RabbitMQ安装及使用

    主要安装过程

    2.2 下载

    wget https://www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

    wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.suse.noarch.rpm

    如果出现下面的情况,是需要在wget参数中增加--no-check-certificate

    增加--no-check-certificate

    最后下载了下面的三个文件

    wget下载的三个文件

    安装顺序为:1.erlang,2.socat,3.rabbitmq

    因为rabbitmq是在erlang环境的运行的,并且需要socat秘钥,最后安装rabbitmq

    使用rpm -ivh rpm包名称 安装rpm包,如:

    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

    2.3 配置

    配置/etc/hostname和/etc/hosts,主机名称和主机关系

    配置/etc/hostname和/etc/hosts

    配置/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

    tcp_listeners就是启动后应用程序访问的端口,loopback_users就是一会我们要用到的用户,改成如图所示即可

    rabbit配置

    输入命令:rabbitmq,按两下Tab键,会发现有三个选项rabbitmqctl rabbitmq-plugins rabbitmq-server

    rabbitmqctl是控制rabbitmq的,如集合、队列等等

    rabbitmq-plugins是管理插件的

    rabbitmq-server就是控制服务了

    我们这时使用命令:rabbitmq-server start &启动rabbitmq服务

    启动rabbitmq服务

    completed with 0 plugins,即我们当前没有使用插件

    因为我们是使用5672端口启动的,使用命令lsof -i:5672确定服务已启动

    COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME

    beam.smp 1812 rabbitmq  52u  IPv6  23637      0t0  TCP *:amqp (LISTEN)

    2.4 管理插件

    使用命令:rabbitmq-plugins list查看插件列表

    使用命令:rabbitmq-plugins enable rabbitmq_management启动控制台

    2.5 访问控制台

    使用http://ip:15672即可访问控制台,输入账号/密码:guest/guest

    控制台样式

    控制台可以查看rabbitmq的基本信息及实时情况,也可以进行添加队列、用户等操作。

    2.6 

    三、RabbitMQ核心概念

    3.1 RabbitMQ的整体架构

    RabbitMQ的整体架构

    3.2 AMQP核心概念

    Server:又称Broker,接收客户端的连接,实现AMQP实体服务。

    Connection:连接,应用程序与Broker的网络连接。

    Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Cha代表一个会话任务。

    Message:消息,服务器和应用程序之间传送的数据,有Properties和Body组成。Properties可以对信息进行修饰,比如消息的优先级、延迟等高级特性,也可以是自定义的信息;Body就是消息体内容。

    Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。个人理解认为Virtual Host可以等比为MySQL的数据库。

    Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列。

    Binding:绑定,Exchange和Queue之间的虚拟连接,Binding中可以包含Routing Key。

    Routing Key:一个路由规则,虚拟机可用它来确定如何路由消息。

    Queue:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

    3.3 RabbitMQ消息是如何流转的

    RabbitMQ消息是如何流转的

    生产者应用投递一个Message(需要指定Virtual Host、Exchange、Routing Key),到达Virtual Host(图中没有画出),然后到达Exchange,根据指定的Routing Key确定到达哪个Message Queue,最后由监听了该Message Queue的消费者应用获取。

    四、与SpringBoot2.x整合-急速入门

    4.1 SpringBoot于RabbitMQ集成

    引入依赖

    <dependency>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-starter-amqp</artifactId>

    </dependency>

    配置application.properties

    spring.rabbitmq.addresses=192.168.47.135:5672

    spring.rabbitmq.username=guest

    spring.rabbitmq.password=guest

    #默认的虚拟主机,也可以自己配置

    spring.rabbitmq.virtual-host=/

    #连接超时时间,单位毫秒

    spring.rabbitmq.connection-timeout=15000

    4.2 编写发送消息服务代码

    消息实体类

    //一定要实现Serializable,并有serialVersionUID,因为一定是要通过网络传输的

    public class Order implements Serializable {

        private static final long serialVersionUID = 1550918097811388561L;

        private String id;

        private String name;

        private String messageId;  //存储消息发送的唯一标识,id对应的messageId

        get、set、constructor...

    }

    订单发送服务

    @Component

    public class OrderSender {

        @Autowired

        private RabbitTemplate rabbitTemplate;

        public void sendOrder(Order order) throws Exception {

            CorrelationData correlationData = new CorrelationData();

            correlationData.setId(order.getMessageId());

            rabbitTemplate.convertAndSend(

                    "order-exchange",  //exchange

                    "order.abcd",          //routingKey

                    order,                     //消息体内容

                    correlationData      //correlationData,用于指定消息的唯一ID

            );

        }

    }

    4.3 配置RabbitMQ

    创建exchange

    创建exchange

    打开mq管理界面,按照上面的示意,填写Name(order-exchange),选择Type(topic),选择Durability(Durable)将消息持久化到硬盘上,最后点击Add exchange完成Exchange的添加。添加后就是图中的order-exchange。

    常用的Exchange Type有三种:fanout、direct、topic

    fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

    direct:把消息投递到那些binding key与routing key完全匹配的队列中。

     topic:将消息路由到binding key与routing key模式匹配的队列中。

    添加Queue

    添加queue

    打开mq管理界面,按照上面的示意,填写Name(order-queue),选择Durability(Durable)将消息持久化到硬盘上,最后点击Add queue完成Queue的添加。添加后就是图中的order-queue。

    在Exchange侧添加Routing Key进行Binding

    在Exchange侧添加Routing Key进行Binding

    在Exchange界面点击order-exchange即可进入上图的界面,填写To queue为order-queue(即刚才创建的队列名称),Routing key为order.#

    在Routing Key填写时注意:

    # 表示后面可以有多个 . 的内容进行路由,即order.a可以根据该Routing Key路由到对应的Queue,order.a.b.c也可以,order01.a是不可以的。

    * 表示只能有一个 . 的内容进行路由,即order.a可以根据该Routing Key路由到对应的Queue,order.a.b.c不可以,order01.a是不可以的。

    下图即为Binding成功的样子

    Binding成功

    从Queue侧也可以看到

    Queue侧查看Binding成功

    4.4 编写测试代码进行测试

    编写代码

    @Autowired

    private OrderSender orderSender;

    @Test

    public void testSendOrder01() throws Exception{

    Order order = new Order("201907280000001", "测试订单1", System.currentTimeMillis() + "$" + UUID.randomUUID());

    orderSender.sendOrder(order);

    }

    查看结果

    在RabbitMQ控制台确认消息已发送到指定队列

    点击order-queue进入队列详情

    在RabbitMQ控制台查看已经被序列化为Base64的消息

    五、保证100%的消息可靠性投递方案落地实现

    5.1 消息可靠性投递方案

    消息可靠性投递方案(一)

    1.Sender将业务消息写入BIZ DB,并将关联的消息记录写入MSG BD,status:0表示投递中

    2.Sender使用异步的方式向MQ Broker投递消息

    3.Confirm Listener接收MQ Broker异步通知的投递成功通知

    4.Confirm Listener将MSG DB的修改status:1,表示投递成功

    5.分布式定时任务从MSG DB获取status:0的数据,重新交给Sender向MQ Broker投递消息

    6.分布式定时任务发现重试次数大于3时,将MSG DB的数据修改status:2,表示投递失败

    第二步和第三步可能因为网络原因或MQ Broker原因投递失败,此时就需要分布式定时任务获取status:0的数据进行处理

    第三步失败时,MQ Broker其实是已经接收到消息了,如果再次投递就会出现重复数据,此时就需要消费端做幂等处理,也就是无论处理几次结果都是一样的

    5.2 实现

    表结构

    相关文章

      网友评论

          本文标题:RabbitMQ学习

          本文链接:https://www.haomeiwen.com/subject/romsvqtx.html