美文网首页java学习之路
spring boot 进阶(二)springBoot整合Rab

spring boot 进阶(二)springBoot整合Rab

作者: 唯有努力不欺人丶 | 来源:发表于2020-12-22 22:06 被阅读0次

在我们大多应用中,都可以通过消息服务中间件来提升系统异步通信,扩展解耦能力。

同步/异步

因为消息队列一般都是异步的,所以这里说一下同步/异步。其实这个概念有很多专业的解释。但是我个人更喜欢看那种举例子的理解,看起来更明确.简单来说同步就是等待出结果。异步就是交代要干啥后不管结果。
消息服务中两个重要的概念:

  • 消息代理(消息中间件服务器)目的地。当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地。

消息队列主要有两种形式的目的地:

  1. 队列:点对点消息通信。(消息发送者发送消息,消息代理将其放入队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列。消息只有唯一的发送者和接受者。但是不是说只有一个接受者。比如A发送给B,C,D三个接收者,B,C,D任何一个接受以后消息被移除。)
  2. 主题:发布/订阅消息通讯

消息队列主要的两个作用:

  • 应用解耦
  • 流量削峰

JMS(java message service)java消息服务:
基于JVM消息代理的规范。ActiveMQ,HornetMQ是jms实现。

AMQP(Advanced Message Queuing Protocol)

  • 高级消息队列协议,也是一个消息代理规范,兼容jms
  • RabbitMQ是AMQP的实现


    JMS和AMQP区别

SpringBoot它既支持jms的activeMQ,也支持AMQP的rabbitMQ.这两个的场景启动类都有,下面一个个说

RabbitMQ简介

RabbitMQ是一个开源的qmqp的实现。是现在非常流行的一个消息中间件。
其核心概念如下:

  • Message:消息,由消息头和消息体组成。消息体是不透明的,消息头是一系列可选的属性组成(可以类比http消息头和消息体)。
  • Publisher:消息生产者,发布消息的客户端应用程序
  • Exchange:交换器,用来接收生产者发送的消息,并且将这些消息路由给服务器中的队列。
  • Queue: 消息队列,用来保存消息的,是消息的容器。也是消息的终点。一个消息可投入多个队列。等待消费者将其取走。
  • Binding:绑定,用于消息队列和交换器之间的关联。消息队列和交换器之间是多对多的关系。
  • Connection: 网络连接
  • Channel:信道。多路复用连接中的一条独立的双向数据流通道。
  • Consumer:消息的消费者,从队列中获取消息的客户端应用程序。
  • Virtual Host: 虚拟主机,表示一批交换器,消息队列和相关对象。每个vhost本质上就是一个迷你版的RabbitMQ服务器。必须在连接时指定。默认是/
  • Broker: 表示消息队列服务器实体。

RabbitMQ运行机制:
生产者Publisher发布消息Message,将消息给broker消息队列服务器。服务器收到消息根据协议头选择相应的交换器Exchange,将消息给这个交换器。交换器通过绑定Bingding将这个消息发送给消息队列Queue。然后消费者Consumer建立连接Connection并通过其中一条信道Channel获取这个消息,同时当获取这个消息后消息从消息队列中移除。

RabbitMQ使用

这里的安装就不说了,我在另一篇文章中详细介绍了.不会的可以去看下:
https://www.jianshu.com/p/2c8f2c6cda4e
下面直接说springBoot整合RabbitMQ:

  1. 导包
    这个依赖我们依然可以从maven仓库查找下载(这里我本来想偷个懒,直接复制redis的依赖最后改成amqp了,结果找不到jar,我只能去maven仓库找,发现这个少了-data)
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.7.RELEASE</version>
        </dependency>
  1. 查看自动配置源码
    这个也算是一个流程了,既然这个amqp有自动配置,那我们按照管理去看看都配置了什么,有什么参数是可配置的。如下:


    默认参数配置
    自动配置类

    因为这个类代码比较多所以我折叠才能截全。其主要工厂中获取rabbit的各种参数,之前类种我们能看到默认的端口,ip,username,password啥的都有了,如果不做改动的话,测试环境可以不额外指定,所以我这里也不写了。下面我们试着使用下RabbitTemplate:
    下面是使用demo:


    测试使用RabbitTemplate
    然后访问后我们去rabbitmq控制台查看,是收到这个消息的(一开始我交换器名称写错了所以没收到,坑死了):
    收到了代码中发送过来的消息
    但是虽然收到消息了,可是如截图上的问题,这个消息体不对啊,我明明传的是要存储的数据到这就这样了,不用猜,有之前redis的基础一想就是序列化的问题,这个问题先放着,接下来说接收消息的代码:
    接受消息

    其实代码很简单,就是指定队列名称就能接收到啦。先把简单的发送和接收消息说完了,接下来说序列化的问题:
    这个肯定是指定的序列化方式不对,其实我们之前处理过redis的序列化,这里应该是有思路的,只要更改序列化方式就行了。首先写个自己的配置,然后自己写bean,如下代码:

    @Bean
    public  MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

然后我们再测试下可视化页面查看消息是不是能看懂了(java代码中只要编码解码方式一样就行,所以看不出啥。)


测试成功!

其实到这里测试了点对点的发送和接收以后,广播也是差不多的用法,只不过有细微的差别,比如广播不用写路由key。剩下就一样了。

RabbitMQ的监听

实际项目中我们常常会用到监听功能。就比如上面的demo中我们要先给某个队列发送消息,然后在知道某个队列有消息的前提下接收这个队列消息。这里如何知道某个队列有消息呢?非监听莫属啦~
其实rabbit有个现成的注解:

@RabbitListener(queues = {"lsj.news"})//这里参数是个数组,可以指定多个队列

当然了想要这个注解起作用还要开启基于RabbitMQ的注解:

@EnableRabbit

这个监听注解的用法也很简单,在方法上使用,接收的消息为ampq的message即可,如下代码:

   @RabbitListener(queues = {"lsj.news"})
   public void receiveMsg(Message message) {    
       System.out.println(message.getBody());
   }

这样当指定的队列有新消息会自动调用此方法接收消息。
刚刚说了这么多,其中交换器,队列都是我们先创建好直接在项目中用的。但是如何代码创建交换器和队列呢?其实只需要使用一个对象AmqpAdmin使用起来直接注入即可,这个可以操作rabbitMQ来执行一些基本的增删功能,如下使用办法:

exchange的实现类
创建一个新的exchange
创建exchange的代码很简单,我这里是最简的,也可以指定一些别的, 比如是否持久化等。创建队列也是差不多的用法,就是declareXX。然后填一些想要指定的信息就可以了,下面也简单做个demo:
创建queue代码实现
最后是绑定,这个也是差不多的用法,唯一区别就是参数多了点,如下截图:
创建绑定代码实现
然后其实每一个参数名称就挺形象的,再不确定可以点进源码看着填入。简单来说第一个参数是队列名称,第二个参数使用QUEUE那个枚举类。第三个参数是交换器名称,第四个参数是路由键。第五个参数是一个map参数,可以为null。我在代码里没看出来是干啥的,反正就没传。这样简单的binding就完成了。
我反正是都跑了一遍,没出问题。所以绑定这一块就这样了。就这样我们代码创建exchange,queue,binding都完成啦!
本篇笔记就记到这里,如果稍微帮到你了记得点个喜欢点个关注,也祝大家工作顺顺利利!

相关文章

网友评论

    本文标题:spring boot 进阶(二)springBoot整合Rab

    本文链接:https://www.haomeiwen.com/subject/zcxlnktx.html