美文网首页RabbitMQ学习RabbitMQRabbitMQ
RabbitMQ笔记十三:使用@RabbitListener注解

RabbitMQ笔记十三:使用@RabbitListener注解

作者: 二月_春风 | 来源:发表于2017-10-22 11:03 被阅读681次

    之前的博客中我们可以在spring容器中构建SimpleMessageListenerContainer来消费消息,我们也可以使用@RabbitListener来消费消息。

    @RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。
    可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过<rabbit:annotation-driven/>来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener

    示列

    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    
    }
    

    定义消息处理器,@RabbitListener注解标记的方法

    @Component
    public class MessageHandler {
    
        @RabbitListener(queues = "zhihao.miao.order")
        public void handleMessage(byte[] message){
            System.out.println("消费消息");
            System.out.println(new String(message));
        }
    }
    

    应用启动类,@EnableRabbit启用@RabbitListener

    @EnableRabbit
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("rabbit service startup");
            TimeUnit.SECONDS.sleep(60);
            context.close();
        }
    }
    

    测试:


    发送消息

    控制台打印:

    消费消息
    你的订单已经生成。
    

    如果发送的消息content_type的属性是text,那么接收的消息处理方法的参数就必须是String类型,如果是byte[]类型就会报错。

    指定content_type类型为text

    控制台报错

    image.png
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageHandler {
    
        //此时如果去掉content_type为text,那么会将消息转换成其每个字符的int类型
        //@RabbitListener(queues = "zhihao.miao.order")
        public void handleMessage(String message){
            System.out.println("消费消息");
            System.out.println(new String(message));
        }
    
        //此时不管属性中有没有content_type属性都能接收到数据
        @RabbitListener(queues = "zhihao.miao.order")
        public void handleMessage(Message message){
            System.out.println("====消费消息===handleMessage(message)");
            System.out.println(message.getMessageProperties());
            System.out.println(new String(message.getBody()));
        }
    }
    

    总结
    如果消息属性中没有指定content_type,则接收消息的处理方法接收类型是byte[],如果消息属性中指定content_type为text,则接收消息的处理方法的参数类型是String类型。不管有没有指定content_type,处理消息方法的参数类型是Message都不会报错。

    步骤

    • 在启动入口增加@EnableRabbit注解
    • 在spring容器中托管一个RabbitListenerContainerFactory的bean(默认的实现是org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory)
    • 写一个消息处理类托管到spring容器中,然后在具体的消息处理方法上增加@RabbitListener注解

    具体的消息处理方法的参数是跟MessageConverter转换后的java对象有关。
    如果想要设置MessageConverter,则需要在RabbitListenerContainerFactory的实例中去设置,(setMessageConverter方法)

    使用@Payload和@Headers注解

    @Component
    public class MessageHandler {
    
        //获取消息的头属性和body属性
        @RabbitListener(queues = "zhihao.miao.order")
        public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){
            System.out.println("====消费消息===handleMessage");
            System.out.println(headers);
            System.out.println(body);
        }
    }
    

    获取单一个Header的属性,Header还有其他的一些属性,比如requireddefaultvalue等属性,顾名思义:

    @Component
    public class MessageHandler {
    
        //获取特定的消息
        @RabbitListener(queues = "zhihao.miao.order")
        public void handleMessage(@Payload String body,@Header String token){
            System.out.println("====消费消息===handleMessage");
            System.out.println(token);
            System.out.println(body);
        }
    }
    
    测试

    指定向多个队列中发送消息

    @Component
    public class MessageHandler {
        @RabbitListener(queues ={"zhihao.miao.order","zhihao.info"})
        public void handleMessage(Message message){
            System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
            System.out.println(message.getMessageProperties());
            System.out.println(new String(message.getBody()));
        }
    }
    

    通过配置文件发送消息

    @Component
    public class MessageHandler {
    
        //通过配置文件发送消息
        @RabbitListener(queues ={"${zhihao.queue1}","${zhihao.queue2}"})
        public void handleMessage(Message message){
            System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
            System.out.println(message.getMessageProperties());
            System.out.println(new String(message.getBody()));
        }
    
    }
    

    配置文件:

    zhihao.queue1=zhihao.miao.order
    zhihao.queue2=zhihao.info
    

    启动类:

    @EnableRabbit
    @ComponentScan
    @PropertySource(value = "classpath:mq.properties")
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("rabbit service startup");
            TimeUnit.SECONDS.sleep(200);
            context.close();
        }
    }
    

    @RabbitListener注解进行声明binding

    定义mq中不存在的Queueexchangeroute key

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MessageHandler {
    
        //支持自动声明绑定,声明之后自动监听队列的队列,此时@RabbitListener注解的queue和bindings不能同时指定,否则报错
        @RabbitListener(bindings ={@QueueBinding(value = @Queue(value = "q5",durable = "true"),
                exchange =@Exchange(value = "zhihao.miao.exchange",durable = "true"),key = "welcome")})
        public void handleMessage(Message message){
            System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
            System.out.println(message.getMessageProperties());
            System.out.println(new String(message.getBody()));
        }
    
    
    }
    

    从上面的我们知道声明必须容器中要有RabbitAdminRabbitTemplate实例

    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
    }
    

    应用启动类

    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @EnableRabbit
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("rabbit service startup");
            TimeUnit.SECONDS.sleep(200);
            context.close();
        }
    }
    

    测试验证

    自动声明成功 消息发送

    控制台打印:

    rabbit service startup
    ====消费消息q5===handleMessage
    MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=zhihao.miao.exchange, receivedRoutingKey=welcome, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-RBqtzCiTxMg6knwQJX7B3A, consumerQueue=q5]
    hello,自动注册成了吗
    

    说明自动声明的绑定中的队列被自动默认监听。@RabbitListener注解中的bindingsqueues参数不能同时指定,否则会报错。

    @RabbitListener和@RabbitHandler搭配使用

    @RabbitListener可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler注解一起使用,@RabbitListener标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler的方法处理,具体找哪个方法处理,需要跟进MessageConverter转换后的java对象。
    配置:

    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    
    
    }
    

    处理器方法

    @Component
    @RabbitListener(queues ="zhihao.miao.order")
    public class MessageHandler {
    
    
        @RabbitHandler
        public void handleMessage(byte[] message){
            System.out.println("====消费消息handleMessage");
            System.out.println(new String(message));
        }
    
        @RabbitHandler
        public void handleMessage2(String message){
            System.out.println("====消费消息===handleMessage2");
            System.out.println(message);
        }
    }
    

    应用启动类:

    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @EnableRabbit
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("rabbit service startup");
            TimeUnit.SECONDS.sleep(3000);
            context.close();
        }
    }
    

    发送不包含content_type属性的消息和content_type属性为text的消息,控制台打印:

    rabbit service startup
    ====消费消息handleMessage
    订单已经生成,请到订单-详情页面确认。
    ====消费消息===handleMessage2
    订单已经生成,请到订单-详情页面确认。
    

    @RabbitListener注解的containerFactory属性

    @RabbitListener注解的containerFactory属性可以指定一个RabbitListenerContainerFactory的bean,默认是找名字为rabbitListenerContainerFactory的实例。

    当我们将ConsumerConfig类中的RabbitListenerContainerFactory实例的对象名改掉的时候,发现就会报错。

    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean("rabbitListenerContainerFactory2")
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    }
    

    此时控制台上报错,

    报错信息

    此时如果配置一下@RabbitListener注解的containerFactory属性便不会报错。

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues ="zhihao.miao.order",containerFactory = "rabbitListenerContainerFactory2")
    public class MessageHandler {
    
        @RabbitHandler
        public void handleMessage(byte[] message){
            System.out.println("====消费消息handleMessage");
            System.out.println(new String(message));
        }
    
        @RabbitHandler
        public void handleMessage2(String message){
            System.out.println("====消费消息===handleMessage2");
            System.out.println(message);
        }
    }
    

    我们再去改造一下在RabbitListenerContainerFactory实例中定义消息类型转换器

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean("rabbitListenerContainerFactory2")
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new MessageConverter() {
                @Override
                public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
                    return null;
                }
    
                @Override
                public Object fromMessage(Message message) throws MessageConversionException {
                    return new User(1,new String(message.getBody()));
                }
            });
            return factory;
        }
    }
    

    User对象:

    public class User {
    
        private int age;
    
        private String name;
        
       ...get set
    
        public User(int age,String name){
            this.age = age;
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "age=" + age +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    

    在处理器中增加参数是User的方法:

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues ="zhihao.miao.order",containerFactory = "rabbitListenerContainerFactory2")
    public class MessageHandler {
    
        @RabbitHandler
        public void handleMessage3(User user){
            System.out.println("====消费消息===handleMessage3");
            System.out.println(user);
        }
    
    
    }
    
    发送消息测试

    相关文章

      网友评论

      • Accidentalyf:你好,能麻烦你看下这个问题吗,https://www.oschina.net/question/3781776_2284036
      • qyvlik:您好,RabbitListener注解的参数queues不是支持多个队列,当 RabbitListener的参数指定多个 queue 时,每个 queue 是不是启动不同的线程去消费各自的 queue,如果不是,需要怎么设置才能做到每个 queue 单独线程消费?

      本文标题:RabbitMQ笔记十三:使用@RabbitListener注解

      本文链接:https://www.haomeiwen.com/subject/kmivextx.html