美文网首页
RabbitMQ整合Spring AMQP(二)

RabbitMQ整合Spring AMQP(二)

作者: 若兮缘 | 来源:发表于2019-03-26 22:23 被阅读0次

    消息模板组件 - RabbitTemplate

    • 我们在与SpringAMQP整合的时候进行发送消息的关键类
    • 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口 ReturnCallback等等。
    • 同样我们需要进行注入到Spring容器中,然后直接使用
    • 在与Spring整合时需要实例化,但是在与SrpingBoot整合时,在配置文件里添加配置即可

    RabbitTemplate的使用

    声明RabbitTemplate,在RabbitMQConfig类中声明

        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
    使用RabbitTemplate发送消息

    MessagePostProcessor类通常用来设置消息的Header以及消息的属性,它允许在消息被转换器处理后对其进行进一步的修改,可以通过它对发送的消息进行统一的设置。

        @Test
        public void testSendMessage() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            //设置消息属性
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
            
            //发送消息
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("------添加额外的设置---------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }
    

    运行测试类,进入管控台的Queues菜单,点击queue001,然后下面有个Get messages,点击按钮就可以获取消息了。

    使用RabbitTemplate发送消息2

    可以调用send方法发送消息,也可以直接发送字符串消息。

        @Test
        public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq发送消息".getBytes(), messageProperties);
            
            //第三个参数必须是Message类型
            rabbitTemplate.send("topic001", "spring.abc", message);
            
            //直接发送字符串消息,会自动转换
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
        }
    

    运行测试类,依然是进入管控台的Queues菜单,点击相应的队列,点击Get messages获取消息。

    消息容器 - SimpleMessageListenerContainer

    • 简单消息监听容器,这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
    • 监听队列(多个队列)、自动启动、自动声明功能
    • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
    • 设置消费者数量、最小最大数量、批量消费
    • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
    • 设置消费者标签生成策略、是否独占模式、消费者属性等
    • 设置具体的监听器、消息转换器等等
    特性说明
    • SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
    • 很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特去实现的。所以可以看出SpringAMQP非常的强大
    SimpleMessageListenerContainer使用

    这里简单列举一些,还可以设置很多很多属性,包括后面要介绍的消息适配器和消息转换器。

        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            //监控队列,可以同时监控多个
            container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            //设置当前消费者数量
            container.setConcurrentConsumers(1);
            //设置消费者并发数量的上限,不能小于ConcurrentConsumers,默认为ConcurrentConsumers,消费者会按需添加
            //指的是消费者的最大并行度是多少
            container.setMaxConcurrentConsumers(5);
            //设置是否重回队列
            container.setDefaultRequeueRejected(false);
            //设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            //设置消费端标签策略:就是在消费端生成自己的标签时可以指定一个生成策略
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
            
            //设置消息监听器
            container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String msg = new String(message.getBody());
                    System.err.println("----------消费者: " + msg);
                }
            });
            return container;
        }
    

    运行Application类,选择Channel菜单,点击表格中的channel,可以看到队列对应的消费者情况,Consumer tag就是根据我们指定的规则生成的。

    关闭Application,运行测试类进行消息发送

        @Test
        public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq发送消息".getBytes(), messageProperties);
            
            //第三个参数必须是Message类型
            rabbitTemplate.send("topic001", "spring.abc", message);
            
            //直接发送字符串消息,会自动转换
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
        }
    

    监听器监听到了消息并打印了如下内容

    ----------消费者: mq发送消息
    ----------消费者: hello object message send!
    ----------消费者: hello object message send!
    

    相关文章

      网友评论

          本文标题:RabbitMQ整合Spring AMQP(二)

          本文链接:https://www.haomeiwen.com/subject/kukovqtx.html