美文网首页
RabbitMQ之SimpleMessageListenerCo

RabbitMQ之SimpleMessageListenerCo

作者: 楼兰King | 来源:发表于2020-11-14 09:40 被阅读0次

    <meta charset="utf-8">

    使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写。

    SimpleMessageListenerContainer详解

    同一个queue上有多个消费者的时候,只会有一个消费者收到消息,一般是多个消费者轮流收到消息。

    image image

    SimpleMessageListenerContainer可以监听多个队列,
    container.setQueueNames的api接收的是一个字符串数组对象。

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info");
            container.setMessageListener((MessageListener) message -> {
                System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            });
            return container;
        }
    
    

    SimpleMessageListenerContainer运行时动态的添加监听队列

    @ComponentScan
    public class Application {
       public static void main(String[] args) throws Exception{
           AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
           SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
           TimeUnit.SECONDS.sleep(20);
           container.addQueueNames("zhihao.error");
           TimeUnit.SECONDS.sleep(20);
           container.addQueueNames("zhihao.debug");
           TimeUnit.SECONDS.sleep(20);
    
           context.close();
       }
    }
    
    

    SimpleMessageListenerContainer纳入容器

     @Bean
       public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
           SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
           container.setConnectionFactory(connectionFactory);
           container.setQueueNames("zhihao.debug");
           container.setMessageListener((MessageListener) message -> {
               if("zhihao.debug".equals(message.getMessageProperties().getConsumerQueue())){
                   System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
                   System.out.println(message.getMessageProperties());
                   System.out.println(new String(message.getBody()));
               }else if("zhihao.error".equals(message.getMessageProperties().getConsumerQueue())){
                   System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
                   System.out.println(message.getMessageProperties());
                   System.out.println(new String(message.getBody()));
               }else if("zhihao.info".equals(message.getMessageProperties().getConsumerQueue())){
                   System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
                   System.out.println(message.getMessageProperties());
                   System.out.println(new String(message.getBody()));
               }
           });
    
           return container;
       }
    
    

    运行时动态的移除监听队列

    SimpleMessageListenerContainer运行时后动态的移除监听队列

    container.removeQueueNames("zhihao.debug");
    
    

    后置处理器

    SimpleMessageListenerContainer增加后置处理

          @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("zhihao.miao.order");
            //后置处理器,接收到的消息都添加了Header请求头
            container.setAfterReceivePostProcessors(message -> {
                message.getMessageProperties().getHeaders().put("desc",10);
                return message;
            });
            container.setMessageListener((MessageListener) message -> {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            });
            return container;
        }
    
    

    应用启动类:

    @ComponentScan
    public class Application {
    
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
            System.out.println(container.getQueueNames()[0]);
            TimeUnit.SECONDS.sleep(30);
            context.close();
        }
    }
    
    

    控制台打印:

    ====接收到消息=====
    MessageProperties [headers={desc=10}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=zhihao.miao.order, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-2xCE8upxgGgf-u1haCwt6A, consumerQueue=zhihao.miao.order]
    消息2
    
    

    setAfterReceivePostProcessors方法可以对消息进行后置处理。

    设置消费者的Consumer_tag和Arguments

    int count=0;
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("zhihao.miao.order");
            //设置消费者的consumerTag_tag
            container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
            //设置消费者的Arguments
            Map<String, Object> args = new HashMap<>();
            args.put("module","订单模块");
            args.put("fun","发送消息");
            container.setConsumerArguments(args);
            container.setMessageListener((MessageListener) message -> {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            });
            return container;
        }
    
    
    image

    container.setConsumerTagStrategy可以设置消费者的 Consumer_tagcontainer.setConsumerArguments可以设置消费者的 Arguments

    setConcurrentConsumers设置并发消费者

      @Bean
       public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
           SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
           container.setConnectionFactory(connectionFactory);
           container.setQueueNames("zhihao.miao.order");
           container.setConcurrentConsumers(5);
           container.setMaxConcurrentConsumers(10);
           container.setMessageListener((MessageListener) message -> {
               System.out.println("====接收到消息=====");
               System.out.println(message.getMessageProperties());
               System.out.println(new String(message.getBody()));
           });
           return container;
       }
    
    
    image

    应用启动类,

    @ComponentScan
    public class Application {
    
       public static void main(String[] args) throws Exception{
           AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
           SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
           container.setConcurrentConsumers(7);
           TimeUnit.SECONDS.sleep(30);
           context.close();
       }
    }
    
    
    image

    setConcurrentConsumers设置多个并发消费者一起消费,并支持运行时动态修改。 setMaxConcurrentConsumers设置最多的并发消费者。

    相关文章

      网友评论

          本文标题:RabbitMQ之SimpleMessageListenerCo

          本文链接:https://www.haomeiwen.com/subject/dsvibktx.html