美文网首页
SpringBoot整合rabbitmq

SpringBoot整合rabbitmq

作者: 任未然 | 来源:发表于2022-01-06 09:00 被阅读0次

    一. 概述

    参考开源项目https://github.com/xkcoding/spring-boot-demo
    本Demo简单集成rabbitmq的使用

    二. 安装rabbitmq

    用docker安装, 没安装docker的先安装docker

    1. 下载镜像:docker pull rabbitmq:3.7.7-management

    2. 运行容器:docker run -d -p 5671:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 --name rabbit-3.7.7 rabbitmq:3.7.7-management

    3. 浏览器打开网址http://localhost:15672/

    默认账号密码: guest/guest

    image.png
    1. 新建交换机,命名为: topic.wpr
    image.png
    1. 新建队列

    队列名为:queue.2, 队列绑定交换机:topic.wpr,路由Key:queue.wpr

    RoutingKey规则

    • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
    • 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
    • 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
    image.png

    三. SpringBoot项目

    3.1 application.yml

    server:
      port: 8080
      servlet:
        context-path: /demo
    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        listener:
          simple:
            retry:
              #enabled:开启失败重试
              enabled: true
              #第一次重试的间隔时长
              initial-interval: 1000ms
              #最大重试次数
              max-attempts: 3
              #最长重试间隔,超过这个间隔将不再重试
              max-interval: 10000ms
              #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
              multiplier: 1
            #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列
            default-requeue-rejected: true
            # 手动提交消息
            acknowledge-mode: manual
          direct:
            # 手动提交消息
            acknowledge-mode: manual
    

    3.2 监听队列事件

    import cn.hutool.json.JSONUtil;
    import com.rabbitmq.client.Channel;
    import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
    import com.xkcoding.mq.rabbitmq.message.MessageStruct;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Slf4j
    @RabbitListener(queues = "queue.2")
    @Component
    public class QueueTwoHandler {
    
        @RabbitHandler
        public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
            //  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
            final long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                log.info("队列2,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
                // 通知 MQ 消息已被成功消费,可以ACK了
                channel.basicAck(deliveryTag, false);
            } catch (IOException e) {
                try {
                    // 处理失败,重新压入MQ
                    channel.basicRecover();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
    

    3.3 消息发送

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringBootDemoMqRabbitmqApplicationTests {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void sendTopic1() {
            rabbitTemplate.convertAndSend("topic.wpr", "queue.wpr", new MessageStruct("topic message"));
        }
    }
    

    相关文章

      网友评论

          本文标题:SpringBoot整合rabbitmq

          本文链接:https://www.haomeiwen.com/subject/fciqcrtx.html