美文网首页收藏spring 整合使用全新框架
RabbitMQ高级整合应用-1、整合Spring AMQP,S

RabbitMQ高级整合应用-1、整合Spring AMQP,S

作者: 那钱有着落吗 | 来源:发表于2021-12-16 11:01 被阅读0次

    RabbitMQ整合Spring AMQP

    1.1RabbitAdmin



    代码实战

    • maven依赖
     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    然后新建一个配置类:

    
    @Configuration
    @ComponentScan("com.example.streamsample.*")
    public class RabbitMQConfig {
    
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("127.0.0.1:5672");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
    
    }
    

    接下来我们就可以进行测试这个功能了:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTest {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Test
    public void contextLoads(){
    
    }
    
    
    @Test
    public void testAdmin(){
        rabbitAdmin.declareExchange(new DirectExchange("test.direct",false,false));
        rabbitAdmin.declareExchange(new TopicExchange("test.topic",false,false));
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout",false,false));
    
        rabbitAdmin.declareQueue(new Queue("test.direct.queue",false));
        rabbitAdmin.declareQueue(new Queue("test.topic.queue",false));
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue",false));
    
        rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
                "test.direct","direct",new HashMap<>()));
    
        //链式编程
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.direct.queue",false))
                .to(new DirectExchange("test.direct",false,false))
                .with("user.#")
        );
    
        //因为fanoutExchange 路由信息直接到队列,所以不需要路由键,所以也就没有with
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.fanout.queue",false))
                        .to(new FanoutExchange("test.fanout",false,false))
        );
    

    rabbitAdmin.purgeQueue("test.direct.queue",false);
    }

    可以在代码中看到,我们使用rabbitAdmin就可以完成很多很多的操作了,创建exchange,创建队列,创建绑定关系,甚至清空队列的操作;

    1.2 Spring AMQP声明

    1.3 消息模板RabbitTemplate


    代码实战

    我们在上一个帖子的基础上在RabbitMQConfig中加入下面配置:

    @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            return new RabbitTemplate(connectionFactory);
        }
    

    然后在测试类中加入:

    @Test
        public void testSendMessage(){
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc","信息描述..");
            messageProperties.getHeaders().put("type","自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(),messageProperties);
    
            rabbitTemplate.convertSendAndReceive("test.topic", "spring.ad", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.out.println("----添加额外的配置信息-----");
                    message.getMessageProperties().getHeaders().put("desc","修改后的配置信息");
                    message.getMessageProperties().getHeaders().put("attr","修改后的属性信息");
                    return message;
                }
            });
        }
    

    注意:发送消息的时候,肯定是首先要创建这个exchange以及queue以及绑定关系,因为在前面帖子,我已经创建过了,而且rabbitmq又没有重启过,虽然创建的exchange,queue不是持久化的,但是因为没有重启rabbitmq服务,所以就还存在着呢,所以我这边就直接可以运行这段代码,来使用rabbittemplate即可。

    发送完之后我们就可以在rabbitmq的管理界面来查看消息是否已经发送,是否已经修改了部分参数了:

    相关文章

      网友评论

        本文标题:RabbitMQ高级整合应用-1、整合Spring AMQP,S

        本文链接:https://www.haomeiwen.com/subject/bkhpfrtx.html