美文网首页
springboot之rabbitmq

springboot之rabbitmq

作者: 陆阳226 | 来源:发表于2020-03-24 16:34 被阅读0次

    rabbitmq简介

    rabbitmq简单介绍:https://www.jianshu.com/p/79f0993da0d0

    包、配置

    可以在创建springboot项目时在message选项中勾选rabbitmq,也可以手动添加包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    springboot配置application.yml,需要指定virtual-host,这里就用了默认的

    spring:
      rabbitmq:
        addresses: 127.0.0.1:5672
        username: guest
        password: guest
        virtual-host: /
    

    rabbitmq配置,替换默认的MessageConverter,可以将对象自动序列化化为json发布到队列,监听队列获取的消息也会自动发序列化为对象的类型

    @Configuration
    public class RabbitConfig {
        /**
         * 替换org.springframework.amqp.support.converter.MessageConverter
         * 在发送和接收消息时使用json格式处理pojo
         */
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    

    准备工作

    传输的实体类

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class User {
        private int id;
        private String name;
    }
    

    代码使用

    简单队列

    生产者Sender发送消息到指定的队列,消费者Receiver监听该队列
    生产者使用rabbitTemplate发布消息,convertAndSend方法接受队列名称和传输的对象
    消费者中使用RabbitListener监听队列,使用queuesToDeclare = @Queue("rabbit")动态创建队列:rabbit

    @Component
    public class Sender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String queue, User user) {
            rabbitTemplate.convertAndSend(queue, user);
        }
    }
    
    @Slf4j
    @Component
    public class Receiver {
        @RabbitListener(queuesToDeclare = @Queue("rabbit"))
        public void getMsg(User user) {
            log.info("get message {}", user);
        }
    }
    

    测试结果,sender向rabbit队列发送一个user对象

    @SpringBootTest
    class ReceiverTest {
    
        @Autowired
        private Sender sender;
    
        @Test
        void getMsg() throws InterruptedException {
                sender.send("rabbit", new User(1, "tom"));
                Thread.sleep(1000);
        }
    }
    

    Receiver监听到rabbit队列,并取出消息user,输出显示

    get message: User(id=1, name=tom)
    

    使用多个消费者

    Receiver中增加一个监听队列的方法

    @RabbitListener(queues = "rabbit")
    public void getMsg(User user) {
        log.info("use another Listener get message: {} ", user);
    }
    

    测试方法:Sender向队列发送5个user,一个消息只能有一个消费者获得

    @Test
    void twoReceiver() throws InterruptedException {
        for (int i = 0; i < 5; i++) {
            String randName = RandomStringUtils.random(5, 97, 122, true, false);
            User user = new User(i, randName);
            sender.send("rabbit", user);
        }
        Thread.sleep(1000);
    }
    
    get message: User(id=1, name=hrsrw) with Receiver02
    get message: User(id=0, name=hhovn) with Receiver01
    get message: User(id=3, name=sfiyg) with Receiver02
    get message: User(id=2, name=ahthc) with Receiver01
    get message: User(id=4, name=efdhg) with Receiver01
    

    使用Exchange

    direct模式

    在Receiver类中增加方法,使用注解将队列queueA绑定到交换器exchange_direct上,路由key可以指定一个或多个。
    Sender类增加send方法,发送消息给Exchange使用路由key,消息不是直接发给队列,而是通过Exchange转发
    测试方法中分别传入两个key发送user对象

    // Receiver类
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("queueA"),
            exchange = @Exchange(value = "exchange_direct", type = ExchangeTypes.DIRECT),
            key = {"direct", "exchange"}
    ))
    public void withExchange(User user) {
        log.info("get message: {} with directExchange", user);
    }
    
    // Sender类
    public void send(String exchange, String key, User user) {
        rabbitTemplate.convertAndSend(exchange, key, user);
    }
    
    // 测试类
    @Test
    void directExchange() throws InterruptedException {
        sender.send("exchange_direct", "direct", new User(2, "jack"));
        sender.send("exchange_direct", "exchange", new User(3, "larry"));
        Thread.sleep(1000);
    }
    

    测试结果,使用两个key发送的消息都可以接收到

    get message: User(id=2, name=jack) with directExchange
    get message: User(id=3, name=larry) with directExchange
    

    fanout模式

    fanout模式下不需要绑定路由键,发给Exchange的消息会被转发到跟其绑定的所有队列。
    Receiver类增加两个RabbitListener方法,分别将queueB、queueC绑定到fanout交换器exchange_fanout
    Sender类不需要加方法,使用发送给exchange的方法,参数key指定为空""

    // Receiver类
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("queueB"),
            exchange = @Exchange(value = "exchange_fanout", type = ExchangeTypes.FANOUT)
    ))
    public void fanOutExchange1(User user) {
        log.info("get message: {} with fanoutExchange", user);
    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("queueC"),
            exchange = @Exchange(value = "exchange_fanout", type = ExchangeTypes.FANOUT)
    ))
    public void fanOutExchange2(User user) {
        log.info("get message: {} with fanoutExchange", user);
    }
    
    // 测试类,不需要指定key
    @Test
    void fanOutExchange() throws InterruptedException {
        sender.send("exchange_fanout", "", new User(4, "Mike"));
        Thread.sleep(1000);
    }
    

    测试结果,exchange_fanout上绑定的两个队列都收到了信息

    get message: User(id=4, name=Mike) with fanoutExchange from queueB
    get message: User(id=4, name=Mike) with fanoutExchange from queueC
    

    topic模式

    topic模式也需要queue和exchange绑定时设置路由key,不过更direct模式不一样的是,topic模式使用通配符来匹配路由key,*表示匹配一个单词,#表示匹配0个或多个单词
    在topic模式下消息会转发给所有匹配路由key的队列

    Receiver类中增加两个方法,使用keyuser.login.#绑定队列queueD到交换器exchange_topic,使用keyuser.register.#绑定队列queueE到交换器exchange_topic

    测试类中发送消息时需要指定exchange、路由key、消息

    // Receiver类
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("queueD"),
            exchange = @Exchange(value = "exchange_topic", type = ExchangeTypes.TOPIC),
            key = "user.login.#"
    ))
    public void topicExchange(User user) {
        log.info("get message: {} with topicExchange from queueD", user);
    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("queueE"),
            exchange = @Exchange(value = "exchange_topic", type = ExchangeTypes.TOPIC),
            key = "user.register.#"
    ))
    public void topicExchange2(User user) {
        log.info("get message: {} with topicExchange from queueE", user);
    }
    
    // 测试类
    @Test
    void topicExchange() throws InterruptedException {
        sender.send("exchange_topic", "user.login", new User(5, "bob"));
        Thread.sleep(1000);
    }
    

    测试结果,只有匹配的queueD收到了消息

    get message: User(id=5, name=bob) with topicExchange from queueD
    

    相关文章

      网友评论

          本文标题:springboot之rabbitmq

          本文链接:https://www.haomeiwen.com/subject/sstayhtx.html