美文网首页
springboot使用Rabbitmq实现消息队列服务

springboot使用Rabbitmq实现消息队列服务

作者: Yluozi | 来源:发表于2021-09-24 15:56 被阅读0次

rabbitmq中间件搭建在本地虚拟机上,详情搭建过程可查看:rabbitmq安装部署
使用上次搭建的dubbo项目补充rabbitmq实现,代码可参考:20分钟springboot搭建dubbo服务

首先查看virtual-host配置(VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。)


image.png

rabbitmq原理结构


image.png

生产者/消费者模型,类似于交换机。Exchange交换器,共有四种类型,不同的类型对应不同的路由策略。
Queue:消息队列,接收消息、缓存消息。

Exchange:交换机,一方面接收生产者发送来的消息。另一方面知道如何处理消息,例如交给特别的队列,或者全部的队列,或者将消息丢弃。到底如何操作取决于Exchange是哪种类型:

根据交换机类型不同,分为3种发布模式:
Direct<定向>:1对1-----一个消息只能被一个消费者消费;把消息交给符合特定routing key(queue与exchange的关系key) 的队列。
Topic<通配符>:1对多-----一个消息可以被多个消费者消费(轮询);把消息交给符合routing pattern (路由模式)的队列。
Fanout<广播>:将消息分发给所有绑定到交换机的队列。

消息队列内生产者添加消息队列数据,消费者接收并使用队列中的数据,上次搭建的简单的dubbo服务中consumer发出请求,provider提供查询数据库的服务,具体如下图:


image.png

继续完成代码实现

consumer主体结构如下:


image.png

补充consumer的pom文件rabbitmq配置


    <!--rabbitmq-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

补充consumer内yml的rabbitmq配置

spring:
  application:
    name: consumer
  profiles:
    active: test
  #配置rabbitMq 服务器
  rabbitmq:
    host: 10.1.31.199
    port: 5672
    username: admin
    password: admin
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /

注意1) rabbitmq的默认web端口号是15672,接扣访问端口是5672
2)rabbitmq的默认virtualhost配置为"/"
在config文件夹添加DirectRabbitConfig类,配置rabbitmq的配置信息如下:

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {
    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue",true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }



    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
}

创建测试接口SendMessageController类,完成消息队列数据的添加

package com.example.consumer.openapi;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/demo")
public class SendMessageController {


    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
}

注意:1)convertAndSend的方法中exchange是Virtual host的name决定了在哪个queue存放消息,routingKey则确定了queue与exchange的绑定,不填写时自动为exchange的name。
2)rabbitTemplate与amqpTemplate方法,rabbitTemplate实现自amqpTemplate接口,使用起来并无区别

启动项目,访问url,执行rabbitmq消息写入:


image.png

写入成功:


image.png

provider主体结构:


image.png

首先同理consumer,修改provider的pom文件及yml文件

在service文件夹内添加DirectReceiver类如下:

package com.example.provider.service.rabbitmq;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {


    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
    }
}

启动provider项目,查看监听到的消息如下:


image.png

简单的消息队列完成。


RabbitTemplate和AmqpTemplate的使用区别:

两者都能调用convertAndSend方法向队列发送消息,而
API:amqpTemplate.convertAndSend("队列名",“消息内容”)此处队列名必须与创建的队列一致。
API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
具体实现可详看使用方法。

相关文章

网友评论

      本文标题:springboot使用Rabbitmq实现消息队列服务

      本文链接:https://www.haomeiwen.com/subject/heqlgltx.html