springboot如何配置RabbitMq
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接着在application.properties中配置mq账户密码等
发送者
@Autowired
private AmqpTemplate template;
/**
* 发送数据到rabbitMq
* @param exchangeName 交换机名称
* @param routingKey 路由
* @param data 需发送的数据
*/
public void sendMqData(String exchangeName, String routingKey, String json){
log.info(routingKey+"===========sendmq:"+json);
template.convertAndSend(exchangeName,routingKey,json);
}
接受者
springboot注解的方式很简单,注意里面的参数怎么写
@RabbitListener(bindings =@QueueBinding(
value = @Queue(value = RabbitMqConfig.reciveQueueName,autoDelete = "true"),
exchange = @Exchange(value = RabbitMqConfig.defaultExchangeName,type = "topic"),
key = "XXX.aaa.gateway.#"
))
@RabbitHandler
public void reciveMqMessage(String message, Channel channel) {
log.info("===========recivemq:"+message);
try {
gatewayReciveDataService.reciveDataHandle(message);
} catch (Exception e) {
e.printStackTrace();
}
}
如何动态创建队列并监听队列
先写配置类
package com.iot.service;
import com.iot.listener.RabbitMqListener;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置类
* GYB
* 20190320
*/
@Configuration
public class RabbitMqConf {
// 配置rabbitmq的监听类,本来打算在xml配置中配置,但是配置文件中要求必须指定队列名称,所以改成这种配置类注解的方式
// <!--<rabbit:listener-container connection-factory="mqconnectionFactory" >-->
// <!--<rabbit:listener ref="rabbitMqListener" queue-names=""/>-->
// <!--</rabbit:listener-container>-->
//RabbitMqListener是我们自定义的接受数据的类,他要 implements MessageListener类,重写onMessage方法即可
@Autowired
RabbitMqListener rabbitMqListener;
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// container.setQueueNames("test-queue1");
container.setMessageListener(rabbitMqListener);
return container;
}
}
然后我们在自己的逻辑业务中,动态创建队列和 监听队列
@Autowired
SimpleMessageListenerContainer container;
@Autowired
ConnectionFactory connectionFactory;
/**
* 创建队列,并绑定routeKeys,并监听此队列消息
* @param queuename
* @param routeKeys
*/
public void createOrBindQueue(String queuename,String... routeKeys) {
if(RabbitMqMemory.IotMqChannel==null){
//创建队列
Channel channel = connectionFactory.createConnection().createChannel(false);
try {
//创建排他队列,Connection连接(非channel连接!)断开时自动删除队列,
channel.queueDeclare(queuename, false, false, true, null);
if(routeKeys.length>0){
// 队列绑定exchange和多个路由
for (String routeKey:routeKeys
) {
channel.queueBind(queuename, RabbitMqMemory.defaultExchangeName, routeKey);
}
}
} catch (IOException e) {
e.printStackTrace();
}
// 为某队列添加监听
container.addQueueNames(queuename);
System.out.println("队列监听中========================");
RabbitMqMemory.IotMqChannel = channel;
}else{
//已有队列和通道,只需绑定routeKeys
queueBindRouteKeys(queuename,routeKeys);
}
}
网友评论