美文网首页
Java消息中间件之RabbitMQ

Java消息中间件之RabbitMQ

作者: 在error边缘疯狂试探 | 来源:发表于2020-04-25 07:39 被阅读0次

    什么是消息中间件?

    • 消息代理规范
      • JMS(Java Message Service)JAVA消息服务:
        基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

      • AMQP(Advanced Message Queuing Protocol):
        高级消息队列协议,也是一个消息代理的规范,兼容JMS
        RabbitMQ是AMQP的实现

    • 作用:(异步处理、应用解耦、流量削峰、分布式事务管理)
      • 通过消息服务中间件来提升系统异步通信、扩展解耦能力
      • 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地

    RabbitMQ

    • RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

    • 核心概念

      • Message

      • 消息,消息是不具名的,它由消息头和消息体组成

      • 消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等

      • Publisher

      • 消息的生产者,也是一个向交换器发布消息的客户端应用程序

      • Exchange

      • 交换器,将生产者消息路由给服务器中的队列

      • 类型有direct(默认),fanout, topic, 和headers,具有不同转发策略

      • Queue

      • 消息队列,保存消息直到发送给消费者

      • Binding

      • 绑定,用于消息队列和交换器之间的关联

      • Connection

      • 网络连接,比如一个TCP连接

      • Consumer

      • 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序

      • Virtual Host

      • 虚拟主机,表示一批交换器、消息队列和相关对象。

      • vhost 是 AMQP 概念的基础,必须在连接时指定

      • RabbitMQ 默认的 vhost 是 /

      • Broker

      • 消息队列服务器实体

    • 运行机制

    消息路由:AMQP 中增加了Exchange 和 Binding 的角色, Binding 决定交换器的消息应该发送到那个队列。

    • Exchange 类型
    1. direct
      点对点模式,消息中的路由键(routing key)如果和 Binding 中的 binding
      key 一致, 交换器就将消息发到对应的队列中。

    2. fanout
      广播模式,每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去

    3. topic
      将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
      识别通配符: # 匹配 0 个或多个单词, *匹配一个单词


    SpringBoot 整合 RabbitMQ

    • 导入依赖(前提得安装好RabbitMQ)
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
    <!--自定义消息转化器Jackson2JsonMessageConverter所需依赖-->
       <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
       </dependency>
    
    • 配置文件
    # 指定rebbitmq服务器主机
    spring.rabbitmq.host=192.168.31.162
    spring.rabbitmq.username=guest  #默认值为guest
    spring.rabbitmq.password=guest   #默认值为guest
    spring.rabbitmq.port=5672       #默认端口号是5672
    spring,rabbitmq.virtual-host=/  #配置RabbitMQ虚拟主机路径 /,默认可以省略
    
    • RabbitMQ的使用

    RabbitAutoConfiguration中有内部类RabbitTemplateConfiguration,在该类中向容器中分别导入了RabbitTemplateAmqpAdmin

    在测试类中分别注入:

    @Autowired
       private RabbitTemplate rabbitTemplate;
    
       @Autowired
       private AmqpAdmin amqpAdmin;
    
    • RabbitTemplate消息发送处理组件,​ 可用来发送和接收消息
    //发送消息,利用的是点对点的exchange模式
    rabbitTemplate.convertAndSend("amq.direct","ustc","aaaa");
          Book book = new Book();
          book.setName("西游记");
          book.setPrice(23.2f);
    //Book要实现Serializable接口
          rabbitTemplate.convertAndSend("amq.direct","ustc",book);
        //convertAndSend(String exchange,String rutingkey,Object object)
    
    //接收消息
    Object o = rabbitTemplate.receiveAndConvert("ustc");
          System.out.println(o.getClass());  //class cn.edu.ustc.springboot.bean.Book
          System.out.println(o);            //Book{name='西游记', price=23.2}
    
    • 由于默认的消息转化器是SimpleMessageConverter,对于对象以jdk序列化方式存储,若要以Json方式存储对象,就要自定义消息转换器。
    @Configuration
    public class AmqpConfig {
        @Bean
        public MessageConverter messageConverter() {
            //在容器中导入Json的消息转换器
            return new Jackson2JsonMessageConverter();
        }
    }
    
    • AmqpAdmin管理组件,可以利用可视化工具,对RabbitMQ增删改exchange、binding、queue.,当然也可以利用代码进行编写。
    //创建Direct类型的Exchange
    amqpAdmin.declareExchange(new DirectExchange("admin.direct"));
    //创建Queue
    amqpAdmin.declareQueue(new Queue("admin.test"));
    //将创建的队列与Exchange绑定
    amqpAdmin.declareBinding(new Binding("admin.test", Binding.DestinationType.QUEUE,"admin.direct","admin.test",null));
    
    • 消息的监听:
      在回调方法上标注@RabbitListener注解,并设置其属性queues,注册监听队列,当该队列收到消息时,标注方法遍会调用

    可分别使用Message和保存消息所属对象进行消息接收,若使用Object对象进行消息接收,实际上接收到的也是Message

    @Service
    public class BookService {
        @RabbitListener(queues = {"admin.test"})
        public void receive1(Book book){
            System.out.println("收到消息:"+book);
        }
    
        @RabbitListener(queues = {"admin.test"})
        public void receive1(Object object){
            System.out.println("收到消息:"+object.getClass());
            //收到消息:class org.springframework.amqp.core.Message
        }
        
        @RabbitListener(queues = {"admin.test"})
        public void receive2(Message message){
            System.out.println("收到消息"+message.getHeaders()+"---"+message.getPayload());
        }
    }
    

    相关文章

      网友评论

          本文标题:Java消息中间件之RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/rzqhwhtx.html