美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记七:Spring AMQP API(生产和消

RabbitMQ笔记七:Spring AMQP API(生产和消

作者: 二月_春风 | 来源:发表于2017-09-27 22:12 被阅读398次

RabbitTemplate类是简化RabbitMQ访问的工具类(发送和接收消息)

总结:
1.使用RabbitTemplate进行消息的发送。
2.使用SimpleMessageListenerContainer类监听队列,进行消息的消费。

使用RabbitTemplate进行消息发送

加入spring-amqp依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.3.RELEASE</version>
        </dependency>
    </dependencies>

配置类:
ConnectionFactoryRabbitTemplate纳入到spring容器中

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
       //设置Exchange默认操作的exchange和routingkey
        rabbitTemplate.setExchange("zhihao.direct.exchange");
        rabbitTemplate.setRoutingKey("zhihao.debug");
        return rabbitTemplate;
    }
}

测试类:
RabbitTemplate#send方法发送消息,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","消息发送");
        messageProperties.getHeaders().put("type",10);

        Message message = new Message("hello".getBytes(),messageProperties);

        /**
         * 调用rabbitTemplate的send方法发送消息,如果没有指定exchange,Routing,则使用声明Exchange指定的
         * exchange,Routing
         * 如果RabbitTemplate没有设置,则默认的exchange 是DEFAULT_EXCHANGE为"",
         * 默认的routkey是DEFAULT_ROUTING_KEY为""
         */
        //1.
        //rabbitTemplate.send(message);

        //2.指定Routingkey,而exchange是Rabbitmq默认指定的
        //rabbitTemplate.send("zhihao.error",message);

        //3.即指定exchange,又指定了routing_key
        //rabbitTemplate.send("zhihao.login","ulogin",message);

        /**
         * 4.使用默认的defaultExchange进行投递消息,route key就是队列名,指定correlation_id属性,correlation_id属性是rabbitmq 进行异步rpc进行标识每次请求的唯一
         * id,下面会讲到
         */
        rabbitTemplate.send("","zhihao.order.queue",message,new CorrelationData("spring.amqp"));

        context.close();
    }
}

查看web管控台发现消息都发送成功了。

使用RabbitTemplate#convertAndSend方法发送消息,

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application2 {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        /**
         * 使用convertAndSend方法,接收的参数是Object对象,其实是将接收的对象转换成Message对象,不指定exchange和routing key,那么就
         * 使用RabbitTemplate中设置的exchange和routing key
         */
        //rabbitTemplate.convertAndSend("this is my message");

        //指定exchange或者指定exchage和routing key
        //rabbitTemplate.convertAndSend("zhihao.error","this is my message order111");
        //rabbitTemplate.convertAndSend("","zhihao.user.queue","this is my message order222");

        //发送消息的后置处理器,MessagePostProcessor类的postProcessMessage方法得到的Message就是将参数Object内容转换成Message对象
        /*
        rabbitTemplate.convertAndSend("", "zhihao.user.queue", "this is my message processor", new MessagePostProcessor() {
            //在后置处理器上加上order和count属性
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("-------处理前message-------------");
                System.out.println(message);
                message.getMessageProperties().getHeaders().put("order",10);
                message.getMessageProperties().getHeaders().put("count",1);
                return message;
            }
        });
        */


        rabbitTemplate.convertAndSend("", "zhihao.user.queue", "message before", message1 -> {
            //使用lamdba的语法
            MessageProperties properties = new MessageProperties();
            properties.getHeaders().put("desc","消息发送");
            properties.getHeaders().put("type",10);

            Message messageafter = new Message("message after".getBytes(),properties);
            return messageafter;
        });

        context.close();
    }
}

消息的消费

使用容器的方式进行消费
认识一个接口org.springframework.amqp.rabbit.listener.MessageListenerContainer
其默认实现类org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer

MessageListenerContainer接口及其实现

MessageListenerContainer#setMessageListener方法,接收的参数类型
org.springframework.amqp.core.MessageListener或者org.springframework.amqp.rabbit.core.ChannelAwareMessageListener接口

代码:
ConnectionFactoryRabbitTemplateSimpleMessageListenerContainer实例纳入到spring容器中进行管理。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /*
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //监听队列zhihao.user.queue,监听队列可以多个,参数类型是String[]
        container.setQueueNames("zhihao.user.queue");
        container.setMessageListener(new MessageListener() {
            //具体的消费逻辑
            @Override
            public void onMessage(Message message) {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            }
        });
        return container;
    }
    */

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //队列可以是多个,参数是String的数组
        container.setQueueNames("zhihao.user.queue");
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            //得到了Channel参数,具体使用会在下面的博客详细讲解
            public void onMessage(Message message, Channel channel) throws Exception {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            }
        });
        return container;
    }
}

关于setMessageListener接收的类型参数,

setMessageListener方法
当接收的参数不是MessageListener或者ChannelAwareMessageListener类型,则会抛出异常,具体的逻辑在checkMessageListener(messageListener);方法, checkMessageListener方法

应用启动类,向zhihao.user.queue对象发送消息,并启动了spring容器,发现监听到队列并且消费了。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        System.out.println(rabbitTemplate);

        rabbitTemplate.convertAndSend("","zhihao.user.queue","hello spring amqp");

        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

原理分析

稍微分析一下原理
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer接口,它继承AbstractMessageListenerContainer类,实现SmartLifecycle接口然后继承Lifecycle接口,意味着一旦SimpleMessageListenerContainer实例被spring容器管理,其生命周期就托管与spring容器来管理了,意味着当spring容器运行起来的时候,SimpleMessageListenerContainer容器启动,spring容器关闭的时候,SimpleMessageListenerContainer容器也关闭了。

设置在spring容器初始化的时候设置SimpleMessageListenerContainer不启动,(container.setAutoStartup(false);

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //队列可以是多个,参数是String的数组
        container.setQueueNames("zhihao.miao.order");
        //设置autoStartUp为false表示SimpleMessageListenerContainer没有启动
        container.setAutoStartup(false);
        container.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println("====接收到消息=====");
                System.out.println(message.getMessageProperties());
                System.out.println(new String(message.getBody()));
            }
        });
        return container;
    }
}

此时不能消费消息,也可以在应用启动类启动SimpleMessageListenerContainer容器,在应用启动类中启动

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        //在spring容器中启动SimpleMessageListenerContainer
        context.getBean(SimpleMessageListenerContainer.class).start();
        TimeUnit.SECONDS.sleep(30);

        context.close();
    }
}

以上说明只有org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer启动了,才会消费消息。

总结
SimpleMessageListenerContainer可以托管到spring容器中,由spring容器进行SimpleMessageListenerContainer的生命周期管理,默认情况下spring容器启动的时候,启动SimpleMessageListenerContainer,spring容器关闭,会stop掉SimpleMessageListenerContainer,也可以设置SimpleMessageListenerContainer手动启动(context.getBean(SimpleMessageListenerContainer.class).start();)。

相关文章

网友评论

本文标题:RabbitMQ笔记七:Spring AMQP API(生产和消

本文链接:https://www.haomeiwen.com/subject/jhsuextx.html