美文网首页
Spring-AMQP

Spring-AMQP

作者: 竹天亮 | 来源:发表于2018-09-26 11:43 被阅读22次

    RabbitMQ

    简介

    RabbitMQ是一个消息代理:它接收和转发消息。可以将它看做一个邮局:你把邮件放到邮箱里,会有邮递员将邮件送到接收人。这个比喻里,RabbitMQ就是邮箱,邮局和邮递员。

    Queue就像是邮箱。虽然消息是从RabbitMQ到程序里,但消息只能放到Queue里。Queue的上限取决于主机的内容和硬盘限制。多个生产者可以将消息发到同一个Queue里,多个消费者可以从同一个Queue里取出消息。

    以下为Spring-AMQP里对应的接口或类

    Exchange

    Exchange接收生产者发送的消息,并将它们放到Queue里。exchange决定如何处理收到的消息。消息是否应该放到指定的Queue里,是否添加到多个Queue里,或者是否被丢弃。这些都由exchange type定义。

    可用的exchange type有:direct, topic, headersfanout

    • fanout: 不需要routing key,将消息发送到所有的Queue里
    • direct: 将Queue绑定到固定的routing key对应的Queue上。
    • topic: 支持通配符*#方式匹配多个routing key上。topic的方式可以满足fanoutdirect两种类型。
    Queue

    Queue代表消息消费者接收消息的一个组件。

    Binding
        @Bean
        public Queue someQueue() {
            Queue queue = new Queue("someQueue");
            return queue;
        }
        
        @Bean
        public DirectExchange someExchange() {
            DirectExchange directExchange = new DirectExchange("someExchange");
            return directExchange;
        }
    

    上面的方式queue和exchange默认是持久化的

    • 绑定Queue到DirectExchange:
    new Binding(someQueue, someDirectExchange, "foo.bar")
    
    • 绑定Queue到TopicExchange:
    new Binding(someQueue, someTopicExchange, "foo.*")
    
    • 绑定Queue到FanoutExchange
    new Binding(someQueue, someFanoutExchange)
    

    spring-amqp还提供了BindingBuilder创建:

    Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*")
    

    还有可以通过@RabbitListener注解直接生成queue和exchange的绑定关系,同时创建了消费者,可谓是一气呵成。

    @RabbitListener(bindings = @QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue(value = "topicQueue", durable = "true"),
                exchange = @Exchange(value = "topic", type = ExchangeTypes.TOPIC, durable = "true")))
    

    上面代码会自动创建名为topicQueue的queue和topic的topicExchange并建立绑定关系 ,这种方式创建的queue和exchange的持久化默认是false。需要显示指定durable

    发送消息
        @Autowired
        private RabbitTemplate template;
    
        @Override
        public void sendMessage(String routingKey, Object data) {
            template.convertAndSend("exchangeName", routingKey, data);
        }
        
        // 发送延迟消息,注意exchange的delay属性要设置成true
        @Override
        public void sendDelayMessage(String routingKey, Object data, int delay) {
            template.convertAndSend("exchangeName", routingKey, data, m -> {
                m.getMessageProperties().setDelay(delay);
                return m;
            });
        }
    
    

    测试当routingKey为空字符串的时候,好像所有的routingkey都能收到。

    接收消息
        @RabbitListener(bindings = @QueueBinding(value = @org.springframework.amqp.rabbit.annotation.Queue(value = "topicQueue", durable = "true"),
                    exchange = @Exchange(value = "topic", type = ExchangeTypes.TOPIC, durable = "true")))
        public void receiveMessage(String message) {
            
        }
    

    或者已有创建的queue:

    @RabbitListener(queues="someQueue")
    public void receiveMessage(String message) {
    
    }
    

    可以创建一个bean,省去强制转换:

        @Bean
        public Jackson2JsonMessageConverter jsonMessageConverter() {
            Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
            converter.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
            return converter;
        }
    

    相关文章

      网友评论

          本文标题:Spring-AMQP

          本文链接:https://www.haomeiwen.com/subject/cslloftx.html