美文网首页RabbitMQ学习rabbitRabbitMQ
RabbitMQ笔记十二:RabbitListenerConfi

RabbitMQ笔记十二:RabbitListenerConfi

作者: 二月_春风 | 来源:发表于2017-10-15 23:59 被阅读142次

RabbitListenerConfigurer详解

RabbitListenerConfigurer源码分析
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public RabbitListenerConfigurer rabbitListenerConfigurer(){

        return new RabbitListenerConfigurer() {
            @Override
            public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {

                //endpoint设置zhihao.miao.order队列的消息处理逻辑
                SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
                endpoint.setId("10");
                endpoint.setQueueNames("zhihao.miao.order");
                endpoint.setMessageListener(message -> {
                    System.out.println("endpoint1处理消息的逻辑");
                    System.out.println(new String(message.getBody()));
                });


                //使用适配器来处理消息,设置了order,pay队列的消息处理逻辑
                SimpleRabbitListenerEndpoint endpoint2 = new SimpleRabbitListenerEndpoint();
                endpoint2.setId("11");
                endpoint2.setQueueNames("order","pay");
                System.out.println("endpoint2处理消息的逻辑");
                endpoint2.setMessageListener(new MessageListenerAdapter(new MessageHandler()));

                //注册二个endpoint
                registrar.registerEndpoint(endpoint);
                registrar.registerEndpoint(endpoint2);
            }
        };
    }
}

消费端消息处理器

public class MessageHandler {

    public void handleMessage(byte[] message){
        System.out.println("消费消息");
        System.out.println(new String(message));
    }
}

消费端应用启动类

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(60);
        context.close();
    }
}

使用总结

  • 实现RabbitListenerConfigurer接口,并把实现类托管到spring容器中
  • 在spring容器中,托管一个RabbitListenerContainerFactory的bean(SimpleRabbitListenerContainerFactory
  • 在启动类上加上@EnableRabbit注解

相关文章

网友评论

    本文标题:RabbitMQ笔记十二:RabbitListenerConfi

    本文链接:https://www.haomeiwen.com/subject/buivextx.html