美文网首页
Java消息中间件之RabbitMQ

Java消息中间件之RabbitMQ

作者: 在error边缘疯狂试探 | 来源:发表于2020-04-25 07:39 被阅读0次

什么是消息中间件?

  • 消息代理规范
    • JMS(Java Message Service)JAVA消息服务:
      基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

    • AMQP(Advanced Message Queuing Protocol):
      高级消息队列协议,也是一个消息代理的规范,兼容JMS
      RabbitMQ是AMQP的实现

  • 作用:(异步处理、应用解耦、流量削峰、分布式事务管理)
    • 通过消息服务中间件来提升系统异步通信、扩展解耦能力
    • 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地

RabbitMQ

  • RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

  • 核心概念

    • Message

    • 消息,消息是不具名的,它由消息头和消息体组成

    • 消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等

    • Publisher

    • 消息的生产者,也是一个向交换器发布消息的客户端应用程序

    • Exchange

    • 交换器,将生产者消息路由给服务器中的队列

    • 类型有direct(默认),fanout, topic, 和headers,具有不同转发策略

    • Queue

    • 消息队列,保存消息直到发送给消费者

    • Binding

    • 绑定,用于消息队列和交换器之间的关联

    • Connection

    • 网络连接,比如一个TCP连接

    • Consumer

    • 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序

    • Virtual Host

    • 虚拟主机,表示一批交换器、消息队列和相关对象。

    • vhost 是 AMQP 概念的基础,必须在连接时指定

    • RabbitMQ 默认的 vhost 是 /

    • Broker

    • 消息队列服务器实体

  • 运行机制

消息路由:AMQP 中增加了Exchange 和 Binding 的角色, Binding 决定交换器的消息应该发送到那个队列。

  • Exchange 类型
  1. direct
    点对点模式,消息中的路由键(routing key)如果和 Binding 中的 binding
    key 一致, 交换器就将消息发到对应的队列中。

  2. fanout
    广播模式,每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去

  3. topic
    将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
    识别通配符: # 匹配 0 个或多个单词, *匹配一个单词


SpringBoot 整合 RabbitMQ

  • 导入依赖(前提得安装好RabbitMQ)
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
<!--自定义消息转化器Jackson2JsonMessageConverter所需依赖-->
   <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
   </dependency>
  • 配置文件
# 指定rebbitmq服务器主机
spring.rabbitmq.host=192.168.31.162
spring.rabbitmq.username=guest  #默认值为guest
spring.rabbitmq.password=guest   #默认值为guest
spring.rabbitmq.port=5672       #默认端口号是5672
spring,rabbitmq.virtual-host=/  #配置RabbitMQ虚拟主机路径 /,默认可以省略
  • RabbitMQ的使用

RabbitAutoConfiguration中有内部类RabbitTemplateConfiguration,在该类中向容器中分别导入了RabbitTemplateAmqpAdmin

在测试类中分别注入:

@Autowired
   private RabbitTemplate rabbitTemplate;

   @Autowired
   private AmqpAdmin amqpAdmin;
  • RabbitTemplate消息发送处理组件,​ 可用来发送和接收消息
//发送消息,利用的是点对点的exchange模式
rabbitTemplate.convertAndSend("amq.direct","ustc","aaaa");
      Book book = new Book();
      book.setName("西游记");
      book.setPrice(23.2f);
//Book要实现Serializable接口
      rabbitTemplate.convertAndSend("amq.direct","ustc",book);
    //convertAndSend(String exchange,String rutingkey,Object object)

//接收消息
Object o = rabbitTemplate.receiveAndConvert("ustc");
      System.out.println(o.getClass());  //class cn.edu.ustc.springboot.bean.Book
      System.out.println(o);            //Book{name='西游记', price=23.2}
  • 由于默认的消息转化器是SimpleMessageConverter,对于对象以jdk序列化方式存储,若要以Json方式存储对象,就要自定义消息转换器。
@Configuration
public class AmqpConfig {
    @Bean
    public MessageConverter messageConverter() {
        //在容器中导入Json的消息转换器
        return new Jackson2JsonMessageConverter();
    }
}
  • AmqpAdmin管理组件,可以利用可视化工具,对RabbitMQ增删改exchange、binding、queue.,当然也可以利用代码进行编写。
//创建Direct类型的Exchange
amqpAdmin.declareExchange(new DirectExchange("admin.direct"));
//创建Queue
amqpAdmin.declareQueue(new Queue("admin.test"));
//将创建的队列与Exchange绑定
amqpAdmin.declareBinding(new Binding("admin.test", Binding.DestinationType.QUEUE,"admin.direct","admin.test",null));
  • 消息的监听:
    在回调方法上标注@RabbitListener注解,并设置其属性queues,注册监听队列,当该队列收到消息时,标注方法遍会调用

可分别使用Message和保存消息所属对象进行消息接收,若使用Object对象进行消息接收,实际上接收到的也是Message

@Service
public class BookService {
    @RabbitListener(queues = {"admin.test"})
    public void receive1(Book book){
        System.out.println("收到消息:"+book);
    }

    @RabbitListener(queues = {"admin.test"})
    public void receive1(Object object){
        System.out.println("收到消息:"+object.getClass());
        //收到消息:class org.springframework.amqp.core.Message
    }
    
    @RabbitListener(queues = {"admin.test"})
    public void receive2(Message message){
        System.out.println("收到消息"+message.getHeaders()+"---"+message.getPayload());
    }
}

相关文章

网友评论

      本文标题:Java消息中间件之RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/rzqhwhtx.html