目前有整合RabbitMQ到项目中, 有个需求就是:需要根据不同的门店创建不同的队列, 而在启动项目初始化的时候, 是不知到有哪些门店在使用,所以创建了也就浪费了资源。所以就有了动态创建队列的需求。
然后在网上疯狂的搜罗了一波。现总结如下:
RabbitConfig.java 配置类
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置rabbitMQ
* @author Rayson517
*
*/
@Configuration
public class RabbitConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
//数据转换为json存入消息队列
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
RabbitUtil.java工具类
import java.util.Date;
import java.util.UUID;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SerializationUtils;
@Configuration
public class RabbitUtil {
private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);
private final RabbitAdmin rabbitAdmin;
private final RabbitTemplate rabbitTemplate;
@Autowired
public RabbitUtil(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate){
this.rabbitAdmin = rabbitAdmin;
this.rabbitTemplate = rabbitTemplate;
}
/**
* 转换Message对象
* @param messageType 返回消息类型 MessageProperties类中常量
* @param msg
* @return
*/
public Message getMessage(String messageType, Object msg){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(messageType);
Message message = new Message(msg.toString().getBytes(),messageProperties);
return message;
}
/**
* 有绑定Key的Exchange发送
* @param routingKey
* @param msg
*/
public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg){
Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON,msg);
rabbitTemplate.send(topicExchange.getName(), routingKey, message);
}
/**
* 没有绑定KEY的Exchange发送
* @param exchange
* @param msg
*/
public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg){
addExchange(exchange);
logger.info("RabbitMQ send "+exchange.getName()+"->"+msg);
rabbitTemplate.convertAndSend(topicExchange.getName(),msg);
}
/**
* 给queue发送消息
* @param queueName
* @param msg
*/
public void sendToQueue(String queueName, String msg){
sendToQueue(DirectExchange.DEFAULT, queueName, msg);
}
/**
* 给direct交换机指定queue发送消息
* @param directExchange
* @param queueName
* @param msg
*/
public void sendToQueue(DirectExchange directExchange, String queueName, String msg){
Queue queue = new Queue(queueName);
addQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
rabbitAdmin.declareBinding(binding);
//设置消息内容的类型,默认是 application/octet-stream 会是 ASCII 码值
rabbitTemplate.convertAndSend(directExchange.getName(), queueName, msg);
}
/**
* 给queue发送消息
* @param queueName
* @param msg
*/
public String receiveFromQueue(String queueName){
return receiveFromQueue(DirectExchange.DEFAULT, queueName);
}
/**
* 给direct交换机指定queue发送消息
* @param directExchange
* @param queueName
* @param msg
*/
public String receiveFromQueue(DirectExchange directExchange, String queueName){
Queue queue = new Queue(queueName);
addQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
rabbitAdmin.declareBinding(binding);
String messages = (String)rabbitTemplate.receiveAndConvert(queueName);
System.out.println("Receive:"+messages);
return messages;
}
/**
* 创建Exchange
* @param exchange
*/
public void addExchange(AbstractExchange exchange){
rabbitAdmin.declareExchange(exchange);
}
/**
* 删除一个Exchange
* @param exchangeName
*/
public boolean deleteExchange(String exchangeName){
return rabbitAdmin.deleteExchange(exchangeName);
}
/**
* Declare a queue whose name is automatically named. It is created with exclusive = true, autoDelete=true, and
* durable = false.
* @return Queue
*/
public Queue addQueue(){
return rabbitAdmin.declareQueue();
}
/**
* 创建一个指定的Queue
* @param queue
* @return queueName
*/
public String addQueue(Queue queue){
return rabbitAdmin.declareQueue(queue);
}
/**
* Delete a queue.
* @param queueName the name of the queue.
* @param unused true if the queue should be deleted only if not in use.
* @param empty true if the queue should be deleted only if empty.
*/
public void deleteQueue(String queueName, boolean unused, boolean empty){
rabbitAdmin.deleteQueue(queueName,unused,empty);
}
/**
* 删除一个queue
* @return true if the queue existed and was deleted.
* @param queueName
*/
public boolean deleteQueue(String queueName){
return rabbitAdmin.deleteQueue(queueName);
}
/**
* 绑定一个队列到一个匹配型交换器使用一个routingKey
* @param queue
* @param exchange
* @param routingKey
*/
public void addBinding(Queue queue ,TopicExchange exchange,String routingKey){
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
}
/**
* 绑定一个Exchange到一个匹配型Exchange 使用一个routingKey
* @param exchange
* @param topicExchange
* @param routingKey
*/
public void addBinding(Exchange exchange,TopicExchange topicExchange,String routingKey){
Binding binding = BindingBuilder.bind(exchange).to(topicExchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
}
/**
* 去掉一个binding
* @param binding
*/
public void removeBinding(Binding binding){
rabbitAdmin.removeBinding(binding);
}
}
然后在需要的地方注入这个RabbitUtil工具类就可以使用了
网友评论