美文网首页Spring Cloud系列专题
四、SpringBoot 集成RabbitMQ

四、SpringBoot 集成RabbitMQ

作者: July_whj | 来源:发表于2021-10-07 21:52 被阅读0次
    SpringBoot 集成RabbitMQ

    一、Docker安装Rabbit MQ

    运行下面命令,docker 可自动拉取镜像,并启动mq。

    docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
    

    我们执行完成后可以运行docker ps查看下mq运行情况

    docker ps

    我们看到RabbitMq已经启动成功,我们在浏览器中打开ip:15672显示如下:

    RabbitMQ 登录页

    输入用户名密码,默认用户名guest/guest;登录成功后显示如下界面。

    首页

    至此,RabbitMQ安装完成。

    二、SpringBoot项目初始化

    我们使用Spring initalizr初始化SpringBoot 项目,Spring initalizr

    Spring initalizr

    这里我们通过Spring官网初始化项目,并添加RabbitMQ的依赖,我们直接点击生成,代码会自动下载下来,我们将下载的代码导入到idea中(我这里的idea是社区版不支持Spring,故在官网初始化项目)。

    导入idea

    项目导入到idea后,我们新创建个controller包,在包中创建IndexController.class。我们使用创建的Controller测试下我们的工程,在IndexController.class中我们添加一下内容:

    @RestController
    public class IndexController {
        @GetMapping("/index")
        public String index() {
            return "Hello RabbitMQ";
        }
    }
    

    启动工程,在浏览器中访问127.0.0.1:8080/index可以看到浏览器中出现“Hello RabbitMQ”,说明我们的工程初始化没有问题。

    初始化成功

    三、SpringBoot配置RabbitMQ

    3.1、创建RabbitMqConfig

    默认RabbitMQ序列化方式是SerializerMessageConverter序列化器,这么我们使用Jackson2JsonMessageConverter序列化器。我们需要设置下,内容如下:

    @Configuration
    public class RabbitMqTemplateConfig {
    
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    
        @Bean
        public MessageConverter jackson2JsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    

    完善SpringBoot配置文件,配置文件内容如下:

    spring.rabbitmq.host=110.40.141.168
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    

    这里我们使用的是application.properties,而非yaml。使用yaml的可以自行转换下。

    3.2、创建队列常量类SimpleMqConstant

    这里只做简单的功能演示,我们把队列的名称统一定义在常量类SimpleMqConstant类中,后续我们扩展其他队列方便维护。

    /**
     * @Author julyWhj
     * @Description 默认的交换机测试$
     * @Date 2021/10/7 10:52 上午
     **/
    public class SimpleMqConstant {
    
        /**
         * 处理对象的MQ队列
         */
        public static final String HANDLER_OBJECT_QUEUE_NAME = "com.july.mq.simple.object";
    
    }
    

    这里我们定义队列名称叫:com.july.mq.simple.object;

    3.3、创建Simple对象

    这里我们创建一个Simple对象,使用该对象进行序列化发送。

    /**
     * @Author julyWhj
     * @Description Simple对象$
     * @Date 2021/10/7 10:55 上午
     **/
    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public class Simple implements Serializable {
        private String name;
        private String no;
        private int age;
        private String phone;
        private Date createTime;
    }
    

    3.4、创建队列消费者SimpleConsumer

    我们创建SimpleConsumer类,做为队列的消费者,内容如下:

    /**
     * @Author julyWhj
     * @Description 消费者$
     * @Date 2021/10/7 10:57 上午
     **/
    @Component
    @Slf4j
    public class SimpleConsumer {
    
    
        @RabbitListener(queuesToDeclare = @Queue(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME))
        @RabbitHandler
        public void receiveObject(Simple simple) throws JsonProcessingException {
            ObjectMapper objectMapper = new ObjectMapper();
            String message = objectMapper.writeValueAsString(simple);
            log.info("simple consumer receive the object:{}", message);
        }
    }
    

    这里我们使用@RabbitListener(queuesToDeclare = @Queue(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME)) 其中queuesToDeclare它可以在队列SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME不存在的时候自动创建队列,不会出现reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX' in vhost '/', class-id=50, method-id=的异常。

    这里我们接收到消息后,只做打印处理,不做其他处理。

    3.5、创建队列生产者SimpleProducer

    队列生产者SimpleProducer内容如下:

    /**
     * @Author julyWhj
     * @Description 生产者$
     * @Date 2021/10/7 10:54 上午
     **/
    @Component
    public class SimpleProducer {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 消息体为对象。配置MessageConverter为Jackson2JsonMessageConverter即可
         *
         * @param simple
         */
        public void sendOrderMessage(Simple simple) {
            rabbitTemplate.convertAndSend(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME, simple);
        }
    }
    

    生产者内容很简单,接收Simple对象,调用convertAndSend 方法发送对象。

    3.6、创建单元测试类SimpleMqTest

    这里我们使用单元测试进行消息的发送和接收测试,测试类内容如下:

    /** * @Author julyWhj * @Description $ * @Date 2021/10/7 10:58 上午 **/@SpringBootTest@Slf4jpublic class SimpleMqTest {    @Autowired    private SimpleProducer simpleProducer;    @Test    public void testSimple() throws Exception {        for (int i = 0; i < 10; i++) {            simpleProducer.sendOrderMessage(Simple.builder()                    .createTime(new Date())                    .name("JulyWhj")                    .age(i)                    .no("ID-0001")                    .phone("138XXXXXXXX")                    .build());        }    }}
    

    我们运行单元测试,看下执行结果:

    消息测试结果

    可以看到,消费者成功接收到10条数据,并成功打印出来。

    四、思考:我们这样写会存在什么问题?

    我们这样写会存在一个致命的问题,消息丢失

    如何造成的消息丢失,我们应该怎么处理保证消息不丢失。后续的文章会为大家逐一分析。这里我们先简单的使用SpringBoot连接MQ,进行收发消息的Demo。

    源码我上传github中,需要的可自行下载。

    相关文章

      网友评论

        本文标题:四、SpringBoot 集成RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/nwttoltx.html