美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记七:Spring AMQP API(生产和消

RabbitMQ笔记七:Spring AMQP API(生产和消

作者: 二月_春风 | 来源:发表于2017-09-27 22:12 被阅读398次

    RabbitTemplate类是简化RabbitMQ访问的工具类(发送和接收消息)

    总结:
    1.使用RabbitTemplate进行消息的发送。
    2.使用SimpleMessageListenerContainer类监听队列,进行消息的消费。

    使用RabbitTemplate进行消息发送

    加入spring-amqp依赖:

        <dependencies>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.7.3.RELEASE</version>
            </dependency>
        </dependencies>
    

    配置类:
    ConnectionFactoryRabbitTemplate纳入到spring容器中

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
           //设置Exchange默认操作的exchange和routingkey
            rabbitTemplate.setExchange("zhihao.direct.exchange");
            rabbitTemplate.setRoutingKey("zhihao.debug");
            return rabbitTemplate;
        }
    }
    

    测试类:
    RabbitTemplate#send方法发送消息,

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    @ComponentScan
    public class Application {
    
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc","消息发送");
            messageProperties.getHeaders().put("type",10);
    
            Message message = new Message("hello".getBytes(),messageProperties);
    
            /**
             * 调用rabbitTemplate的send方法发送消息,如果没有指定exchange,Routing,则使用声明Exchange指定的
             * exchange,Routing
             * 如果RabbitTemplate没有设置,则默认的exchange 是DEFAULT_EXCHANGE为"",
             * 默认的routkey是DEFAULT_ROUTING_KEY为""
             */
            //1.
            //rabbitTemplate.send(message);
    
            //2.指定Routingkey,而exchange是Rabbitmq默认指定的
            //rabbitTemplate.send("zhihao.error",message);
    
            //3.即指定exchange,又指定了routing_key
            //rabbitTemplate.send("zhihao.login","ulogin",message);
    
            /**
             * 4.使用默认的defaultExchange进行投递消息,route key就是队列名,指定correlation_id属性,correlation_id属性是rabbitmq 进行异步rpc进行标识每次请求的唯一
             * id,下面会讲到
             */
            rabbitTemplate.send("","zhihao.order.queue",message,new CorrelationData("spring.amqp"));
    
            context.close();
        }
    }
    

    查看web管控台发现消息都发送成功了。

    使用RabbitTemplate#convertAndSend方法发送消息,

    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    @ComponentScan
    public class Application2 {
    
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            /**
             * 使用convertAndSend方法,接收的参数是Object对象,其实是将接收的对象转换成Message对象,不指定exchange和routing key,那么就
             * 使用RabbitTemplate中设置的exchange和routing key
             */
            //rabbitTemplate.convertAndSend("this is my message");
    
            //指定exchange或者指定exchage和routing key
            //rabbitTemplate.convertAndSend("zhihao.error","this is my message order111");
            //rabbitTemplate.convertAndSend("","zhihao.user.queue","this is my message order222");
    
            //发送消息的后置处理器,MessagePostProcessor类的postProcessMessage方法得到的Message就是将参数Object内容转换成Message对象
            /*
            rabbitTemplate.convertAndSend("", "zhihao.user.queue", "this is my message processor", new MessagePostProcessor() {
                //在后置处理器上加上order和count属性
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.out.println("-------处理前message-------------");
                    System.out.println(message);
                    message.getMessageProperties().getHeaders().put("order",10);
                    message.getMessageProperties().getHeaders().put("count",1);
                    return message;
                }
            });
            */
    
    
            rabbitTemplate.convertAndSend("", "zhihao.user.queue", "message before", message1 -> {
                //使用lamdba的语法
                MessageProperties properties = new MessageProperties();
                properties.getHeaders().put("desc","消息发送");
                properties.getHeaders().put("type",10);
    
                Message messageafter = new Message("message after".getBytes(),properties);
                return messageafter;
            });
    
            context.close();
        }
    }
    

    消息的消费

    使用容器的方式进行消费
    认识一个接口org.springframework.amqp.rabbit.listener.MessageListenerContainer
    其默认实现类org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer

    MessageListenerContainer接口及其实现

    MessageListenerContainer#setMessageListener方法,接收的参数类型
    org.springframework.amqp.core.MessageListener或者org.springframework.amqp.rabbit.core.ChannelAwareMessageListener接口

    代码:
    ConnectionFactoryRabbitTemplateSimpleMessageListenerContainer实例纳入到spring容器中进行管理。

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        /*
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //监听队列zhihao.user.queue,监听队列可以多个,参数类型是String[]
            container.setQueueNames("zhihao.user.queue");
            container.setMessageListener(new MessageListener() {
                //具体的消费逻辑
                @Override
                public void onMessage(Message message) {
                    System.out.println("====接收到消息=====");
                    System.out.println(message.getMessageProperties());
                    System.out.println(new String(message.getBody()));
                }
            });
            return container;
        }
        */
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //队列可以是多个,参数是String的数组
            container.setQueueNames("zhihao.user.queue");
            container.setMessageListener(new ChannelAwareMessageListener(){
                @Override
                //得到了Channel参数,具体使用会在下面的博客详细讲解
                public void onMessage(Message message, Channel channel) throws Exception {
                    System.out.println("====接收到消息=====");
                    System.out.println(message.getMessageProperties());
                    System.out.println(new String(message.getBody()));
                }
            });
            return container;
        }
    }
    

    关于setMessageListener接收的类型参数,

    setMessageListener方法
    当接收的参数不是MessageListener或者ChannelAwareMessageListener类型,则会抛出异常,具体的逻辑在checkMessageListener(messageListener);方法, checkMessageListener方法

    应用启动类,向zhihao.user.queue对象发送消息,并启动了spring容器,发现监听到队列并且消费了。

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
    
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
            System.out.println(rabbitTemplate);
    
            rabbitTemplate.convertAndSend("","zhihao.user.queue","hello spring amqp");
    
            TimeUnit.SECONDS.sleep(30);
    
            context.close();
        }
    }
    

    原理分析

    稍微分析一下原理
    org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer接口,它继承AbstractMessageListenerContainer类,实现SmartLifecycle接口然后继承Lifecycle接口,意味着一旦SimpleMessageListenerContainer实例被spring容器管理,其生命周期就托管与spring容器来管理了,意味着当spring容器运行起来的时候,SimpleMessageListenerContainer容器启动,spring容器关闭的时候,SimpleMessageListenerContainer容器也关闭了。

    设置在spring容器初始化的时候设置SimpleMessageListenerContainer不启动,(container.setAutoStartup(false);

        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //队列可以是多个,参数是String的数组
            container.setQueueNames("zhihao.miao.order");
            //设置autoStartUp为false表示SimpleMessageListenerContainer没有启动
            container.setAutoStartup(false);
            container.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    System.out.println("====接收到消息=====");
                    System.out.println(message.getMessageProperties());
                    System.out.println(new String(message.getBody()));
                }
            });
            return container;
        }
    }
    

    此时不能消费消息,也可以在应用启动类启动SimpleMessageListenerContainer容器,在应用启动类中启动

    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
    
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            //在spring容器中启动SimpleMessageListenerContainer
            context.getBean(SimpleMessageListenerContainer.class).start();
            TimeUnit.SECONDS.sleep(30);
    
            context.close();
        }
    }
    

    以上说明只有org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer启动了,才会消费消息。

    总结
    SimpleMessageListenerContainer可以托管到spring容器中,由spring容器进行SimpleMessageListenerContainer的生命周期管理,默认情况下spring容器启动的时候,启动SimpleMessageListenerContainer,spring容器关闭,会stop掉SimpleMessageListenerContainer,也可以设置SimpleMessageListenerContainer手动启动(context.getBean(SimpleMessageListenerContainer.class).start();)。

    相关文章

      网友评论

      本文标题:RabbitMQ笔记七:Spring AMQP API(生产和消

      本文链接:https://www.haomeiwen.com/subject/jhsuextx.html