美文网首页rabbitmq
RabbitMQ之业务场景(四):动态创建,删除队列工具类,拿来

RabbitMQ之业务场景(四):动态创建,删除队列工具类,拿来

作者: raysonfang | 来源:发表于2018-09-25 16:11 被阅读1413次

目前有整合RabbitMQ到项目中, 有个需求就是:需要根据不同的门店创建不同的队列, 而在启动项目初始化的时候, 是不知到有哪些门店在使用,所以创建了也就浪费了资源。所以就有了动态创建队列的需求。
然后在网上疯狂的搜罗了一波。现总结如下:
RabbitConfig.java 配置类

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置rabbitMQ
 * @author Rayson517
 *
 */
@Configuration
public class RabbitConfig {
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }
    
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        //数据转换为json存入消息队列
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

RabbitUtil.java工具类

import java.util.Date;
import java.util.UUID;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SerializationUtils;

@Configuration
public class RabbitUtil {
    
    private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);
    
    private final RabbitAdmin rabbitAdmin;
    
    private final RabbitTemplate rabbitTemplate;
    
    @Autowired
    public RabbitUtil(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate){
        this.rabbitAdmin = rabbitAdmin;
        this.rabbitTemplate = rabbitTemplate;
    }
    
    /**
     * 转换Message对象
     * @param messageType 返回消息类型 MessageProperties类中常量
     * @param msg
     * @return
     */
    public Message getMessage(String messageType, Object msg){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(messageType);
        Message message = new Message(msg.toString().getBytes(),messageProperties);
        return message;
    }

    /**
     * 有绑定Key的Exchange发送
     * @param routingKey
     * @param msg
     */
    public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg){
        Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON,msg);
        rabbitTemplate.send(topicExchange.getName(), routingKey, message);
    }

    /**
     * 没有绑定KEY的Exchange发送
     * @param exchange
     * @param msg
     */
    public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg){
        addExchange(exchange);
        logger.info("RabbitMQ send "+exchange.getName()+"->"+msg);
        rabbitTemplate.convertAndSend(topicExchange.getName(),msg);
    }

    /**
     * 给queue发送消息
     * @param queueName
     * @param msg
     */
    public void sendToQueue(String queueName, String msg){
        sendToQueue(DirectExchange.DEFAULT, queueName, msg);
    }
    
    /**
     * 给direct交换机指定queue发送消息
     * @param directExchange
     * @param queueName
     * @param msg
     */
    public void sendToQueue(DirectExchange directExchange, String queueName, String msg){
        Queue queue = new Queue(queueName);
        addQueue(queue);
        Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
        rabbitAdmin.declareBinding(binding);
        //设置消息内容的类型,默认是 application/octet-stream 会是 ASCII 码值
        rabbitTemplate.convertAndSend(directExchange.getName(), queueName, msg);
    }
    
    /**
     * 给queue发送消息
     * @param queueName
     * @param msg
     */
    public String receiveFromQueue(String queueName){
        return receiveFromQueue(DirectExchange.DEFAULT, queueName);
    }
    
    /**
     * 给direct交换机指定queue发送消息
     * @param directExchange
     * @param queueName
     * @param msg
     */
    public String receiveFromQueue(DirectExchange directExchange, String queueName){
        Queue queue = new Queue(queueName);
        addQueue(queue);
        Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
        rabbitAdmin.declareBinding(binding);
        String messages = (String)rabbitTemplate.receiveAndConvert(queueName);
        System.out.println("Receive:"+messages);
        return messages;
    }
    
    /**
     * 创建Exchange
     * @param exchange
     */
    public void addExchange(AbstractExchange exchange){
        rabbitAdmin.declareExchange(exchange);
    }

    /**
     * 删除一个Exchange
     * @param exchangeName
     */
    public boolean deleteExchange(String exchangeName){
        return rabbitAdmin.deleteExchange(exchangeName);
    }


    /**
     * Declare a queue whose name is automatically named. It is created with exclusive = true, autoDelete=true, and
     * durable = false.
     * @return Queue
     */
    public Queue addQueue(){
        return rabbitAdmin.declareQueue();
    }

    /**
     * 创建一个指定的Queue
     * @param queue
     * @return queueName
     */
    public String addQueue(Queue queue){
        return rabbitAdmin.declareQueue(queue);
    }

    /**
     * Delete a queue.
     * @param queueName the name of the queue.
     * @param unused true if the queue should be deleted only if not in use.
     * @param empty true if the queue should be deleted only if empty.
     */
    public void deleteQueue(String queueName, boolean unused, boolean empty){
        rabbitAdmin.deleteQueue(queueName,unused,empty);
    }

    /**
     * 删除一个queue
     * @return true if the queue existed and was deleted.
     * @param queueName
     */
    public boolean deleteQueue(String queueName){
        return rabbitAdmin.deleteQueue(queueName);
    }

    /**
     * 绑定一个队列到一个匹配型交换器使用一个routingKey
     * @param queue
     * @param exchange
     * @param routingKey
     */
    public void addBinding(Queue queue ,TopicExchange exchange,String routingKey){
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
        rabbitAdmin.declareBinding(binding);
    }
    
    /**
     * 绑定一个Exchange到一个匹配型Exchange 使用一个routingKey
     * @param exchange
     * @param topicExchange
     * @param routingKey
     */
    public void addBinding(Exchange exchange,TopicExchange topicExchange,String routingKey){
        Binding binding = BindingBuilder.bind(exchange).to(topicExchange).with(routingKey);
        rabbitAdmin.declareBinding(binding);
    }
    
    /**
     * 去掉一个binding
     * @param binding
     */
    public void removeBinding(Binding binding){
        rabbitAdmin.removeBinding(binding);
    }
}

然后在需要的地方注入这个RabbitUtil工具类就可以使用了

相关文章

网友评论

    本文标题:RabbitMQ之业务场景(四):动态创建,删除队列工具类,拿来

    本文链接:https://www.haomeiwen.com/subject/nfrloftx.html