书接上回,我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写。
SimpleMessageListenerContainer详解
同一个queue上有多个消费者的时候,只会有一个消费者收到消息,一般是多个消费者轮流收到消息。
消费者1 消费者2SimpleMessageListenerContainer
可以监听多个队列,
container.setQueueNames
的api接收的是一个字符串数组对象。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info");
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
SimpleMessageListenerContainer运行时动态的添加监听队列
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
TimeUnit.SECONDS.sleep(20);
container.addQueueNames("zhihao.error");
TimeUnit.SECONDS.sleep(20);
container.addQueueNames("zhihao.debug");
TimeUnit.SECONDS.sleep(20);
context.close();
}
}
SimpleMessageListenerContainer纳入容器
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.debug");
container.setMessageListener((MessageListener) message -> {
if("zhihao.debug".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}else if("zhihao.error".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}else if("zhihao.info".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
}
});
return container;
}
运行时动态的移除监听队列
SimpleMessageListenerContainer运行时后动态的移除监听队列
container.removeQueueNames("zhihao.debug");
后置处理器
SimpleMessageListenerContainer增加后置处理
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
//后置处理器,接收到的消息都添加了Header请求头
container.setAfterReceivePostProcessors(message -> {
message.getMessageProperties().getHeaders().put("desc",10);
return message;
});
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
应用启动类:
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
System.out.println(container.getQueueNames()[0]);
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
控制台打印:
====接收到消息=====
MessageProperties [headers={desc=10}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=zhihao.miao.order, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-2xCE8upxgGgf-u1haCwt6A, consumerQueue=zhihao.miao.order]
消息2
setAfterReceivePostProcessors
方法可以对消息进行后置处理。
设置消费者的Consumer_tag和Arguments
int count=0;
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
//设置消费者的consumerTag_tag
container.setConsumerTagStrategy(queue -> "order_queue_"+(++count));
//设置消费者的Arguments
Map<String, Object> args = new HashMap<>();
args.put("module","订单模块");
args.put("fun","发送消息");
container.setConsumerArguments(args);
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
web控制面板
container.setConsumerTagStrategy
可以设置消费者的 Consumer_tag
, container.setConsumerArguments
可以设置消费者的 Arguments
setConcurrentConsumers设置并发消费者
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
container.setConcurrentConsumers(5);
container.setMaxConcurrentConsumers(10);
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
并发消费数
应用启动类,
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
container.setConcurrentConsumers(7);
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
运行期间动态的修改并发消费者的数量
setConcurrentConsumers
设置多个并发消费者一起消费,并支持运行时动态修改。 setMaxConcurrentConsumers
设置最多的并发消费者。
网友评论