之前的博客中我们可以在spring容器中构建SimpleMessageListenerContainer
来消费消息,我们也可以使用@RabbitListener
来消费消息。
@RabbitListener
注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory
对象。
可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过<rabbit:annotation-driven/>来设置@RabbitListener
的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener
。
示列
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
定义消息处理器,@RabbitListener
注解标记的方法
@Component
public class MessageHandler {
@RabbitListener(queues = "zhihao.miao.order")
public void handleMessage(byte[] message){
System.out.println("消费消息");
System.out.println(new String(message));
}
}
应用启动类,@EnableRabbit
启用@RabbitListener
@EnableRabbit
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(60);
context.close();
}
}
测试:
发送消息
控制台打印:
消费消息
你的订单已经生成。
如果发送的消息content_type
的属性是text
,那么接收的消息处理方法的参数就必须是String
类型,如果是byte[]
类型就会报错。
控制台报错
image.pngimport org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageHandler {
//此时如果去掉content_type为text,那么会将消息转换成其每个字符的int类型
//@RabbitListener(queues = "zhihao.miao.order")
public void handleMessage(String message){
System.out.println("消费消息");
System.out.println(new String(message));
}
//此时不管属性中有没有content_type属性都能接收到数据
@RabbitListener(queues = "zhihao.miao.order")
public void handleMessage(Message message){
System.out.println("====消费消息===handleMessage(message)");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}
}
总结
如果消息属性中没有指定content_type
,则接收消息的处理方法接收类型是byte[]
,如果消息属性中指定content_type为text
,则接收消息的处理方法的参数类型是String
类型。不管有没有指定content_type
,处理消息方法的参数类型是Message都不会报错。
步骤
- 在启动入口增加@EnableRabbit注解
- 在spring容器中托管一个RabbitListenerContainerFactory的bean(默认的实现是org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory)
- 写一个消息处理类托管到spring容器中,然后在具体的消息处理方法上增加
@RabbitListener
注解
具体的消息处理方法的参数是跟MessageConverter
转换后的java对象有关。
如果想要设置MessageConverte
r,则需要在RabbitListenerContainerFactory
的实例中去设置,(setMessageConverter
方法)
使用@Payload和@Headers注解
@Component
public class MessageHandler {
//获取消息的头属性和body属性
@RabbitListener(queues = "zhihao.miao.order")
public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){
System.out.println("====消费消息===handleMessage");
System.out.println(headers);
System.out.println(body);
}
}
获取单一个Header的属性,Header还有其他的一些属性,比如required
,defaultvalue
等属性,顾名思义:
@Component
public class MessageHandler {
//获取特定的消息
@RabbitListener(queues = "zhihao.miao.order")
public void handleMessage(@Payload String body,@Header String token){
System.out.println("====消费消息===handleMessage");
System.out.println(token);
System.out.println(body);
}
}
测试
指定向多个队列中发送消息
@Component
public class MessageHandler {
@RabbitListener(queues ={"zhihao.miao.order","zhihao.info"})
public void handleMessage(Message message){
System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}
}
通过配置文件发送消息
@Component
public class MessageHandler {
//通过配置文件发送消息
@RabbitListener(queues ={"${zhihao.queue1}","${zhihao.queue2}"})
public void handleMessage(Message message){
System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}
}
配置文件:
zhihao.queue1=zhihao.miao.order
zhihao.queue2=zhihao.info
启动类:
@EnableRabbit
@ComponentScan
@PropertySource(value = "classpath:mq.properties")
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(200);
context.close();
}
}
@RabbitListener注解进行声明binding
定义mq中不存在的Queue
,exchange
和route key
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageHandler {
//支持自动声明绑定,声明之后自动监听队列的队列,此时@RabbitListener注解的queue和bindings不能同时指定,否则报错
@RabbitListener(bindings ={@QueueBinding(value = @Queue(value = "q5",durable = "true"),
exchange =@Exchange(value = "zhihao.miao.exchange",durable = "true"),key = "welcome")})
public void handleMessage(Message message){
System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}
}
从上面的我们知道声明必须容器中要有RabbitAdmin
和RabbitTemplate
实例
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
应用启动类
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@EnableRabbit
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(200);
context.close();
}
}
测试验证
自动声明成功 消息发送控制台打印:
rabbit service startup
====消费消息q5===handleMessage
MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=zhihao.miao.exchange, receivedRoutingKey=welcome, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-RBqtzCiTxMg6knwQJX7B3A, consumerQueue=q5]
hello,自动注册成了吗
说明自动声明的绑定中的队列被自动默认监听。@RabbitListener
注解中的bindings
和queues
参数不能同时指定,否则会报错。
@RabbitListener和@RabbitHandler搭配使用
@RabbitListener
可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler
注解一起使用,@RabbitListener
标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler
的方法处理,具体找哪个方法处理,需要跟进MessageConverter
转换后的java对象。
配置:
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
处理器方法
@Component
@RabbitListener(queues ="zhihao.miao.order")
public class MessageHandler {
@RabbitHandler
public void handleMessage(byte[] message){
System.out.println("====消费消息handleMessage");
System.out.println(new String(message));
}
@RabbitHandler
public void handleMessage2(String message){
System.out.println("====消费消息===handleMessage2");
System.out.println(message);
}
}
应用启动类:
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@EnableRabbit
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(3000);
context.close();
}
}
发送不包含content_type属性的消息和content_type属性为text的消息,控制台打印:
rabbit service startup
====消费消息handleMessage
订单已经生成,请到订单-详情页面确认。
====消费消息===handleMessage2
订单已经生成,请到订单-详情页面确认。
@RabbitListener注解的containerFactory属性
@RabbitListener
注解的containerFactory
属性可以指定一个RabbitListenerContainerFactory
的bean,默认是找名字为rabbitListenerContainerFactory
的实例。
当我们将ConsumerConfig
类中的RabbitListenerContainerFactory
实例的对象名改掉的时候,发现就会报错。
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean("rabbitListenerContainerFactory2")
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
此时控制台上报错,
报错信息此时如果配置一下@RabbitListener
注解的containerFactory
属性便不会报错。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues ="zhihao.miao.order",containerFactory = "rabbitListenerContainerFactory2")
public class MessageHandler {
@RabbitHandler
public void handleMessage(byte[] message){
System.out.println("====消费消息handleMessage");
System.out.println(new String(message));
}
@RabbitHandler
public void handleMessage2(String message){
System.out.println("====消费消息===handleMessage2");
System.out.println(message);
}
}
我们再去改造一下在RabbitListenerContainerFactory
实例中定义消息类型转换器
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean("rabbitListenerContainerFactory2")
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return new User(1,new String(message.getBody()));
}
});
return factory;
}
}
User对象:
public class User {
private int age;
private String name;
...get set
public User(int age,String name){
this.age = age;
this.name = name;
}
@Override
public String toString() {
return "User{" +
"age=" + age +
", name='" + name + '\'' +
'}';
}
}
在处理器中增加参数是User的方法:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues ="zhihao.miao.order",containerFactory = "rabbitListenerContainerFactory2")
public class MessageHandler {
@RabbitHandler
public void handleMessage3(User user){
System.out.println("====消费消息===handleMessage3");
System.out.println(user);
}
}
发送消息测试
网友评论