美文网首页
SpringBoot + RabbitMQ 应用

SpringBoot + RabbitMQ 应用

作者: AltF4_小寒 | 来源:发表于2018-07-27 11:54 被阅读771次

    RabbitMq的配置这里不做解释,安装语言包、客户端、使用控制插件等。

    需要准备的导入

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    yml文件配置(给出了简单配置,具体配置这里是官方的:https://github.com/spring-projects/spring-boot/blob/v2.0.3.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
    ):

    spring:
    #rabbitmq
    rabbitmq:
       host: 127.0.0.1
       port: 5672
       username: admin
       password: 123456
       #自己的域
       virtual-host: dc
       #永久连接
       connection-timeout: 0 
    

    接下来是配置类,主要功能是配置RabbitAdmin(暂时好像必须自己配置进Spring) 和 RabbitTemplate(这个是可以自己直接注入的,也可以在配置类里进行一些处理),和自己定义 一些固定的Exchange或者Queue。配置代码例如:

     @Configuration
    public class RabbitmqConfig {
        @Value("${ExchangeName}")
        private String exchangeName;
    
    
        /**
         * 创建admin,为了能自定义队列或交换机
         * @return
         */
        @Bean
        RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
            //数据转换为json存入消息队列
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //  rabbitTemplate.setReplyAddress(replyQueue().getName());
            //  rabbitTemplate.setReplyTimeout(100000000);
            //发布确认
            return rabbitTemplate;
        }
    
        @Bean
        public TopicExchange exchange(RabbitAdmin rabbitAdmin){
            TopicExchange dataExchange = new TopicExchange(exchangeName,true,false);
            rabbitAdmin.declareExchange(dataExchange);
            return dataExchange;
        }
    
    
    }
    

    需要注意的是在IDEA工具中,这类里会出现“Could not autowire. No beans of 'ConnectionFactory' type found” 这样的警告,但是启动并不会报错。

    这样的配置之下,已经拥有了rabbitmq连接、并且注入了RabbitAbmin,就能动态的去对Exchange、Queue操作。
    创建Exchange:

     /**
      * FanoutExchange、TopicExchange、HeadersExchange 等
      * 都是同一个父类 AbstractExchange
      */
     //FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
     TopicExchange dataExchange = new TopicExchange(exchangeName,true,false);
     rabbitAdmin.declareExchange(dataExchange);
    

    删除Exchange:

    rabbitAdmin.deleteExchange(exchangeName);
    

    创建Queue:

     Queue queue = new Queue(queueName,true,false,false);
     rabbitAdmin.declareQueue(queue);
    

    删除Queue:

     rabbitAdmin.deleteQueue(queueName);
    

    建立绑定(这里给出Topic的匹配绑定):

    Binding binding = BindingBuilder.bind(queue).to(topicExchange).with(routingKey+".#");
    rabbitAdmin.declareBinding(binding);
    

    发送消息:

    @Component
    public class SendMessage {
    
        private final static Logger logger = LoggerFactory.getLogger(SendMessage.class);
    
        private RabbitAdmin rabbitAdmin;
        @Autowired
        public void setRabbitAdmin(RabbitAdmin rabbitAdmin) {
            this.rabbitAdmin = rabbitAdmin;
        }
        private RabbitTemplate rabbitTemplate;
        @Autowired
        public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        private TopicExchange topicExchange;
        @Autowired
        public void setTopicExchange(TopicExchange topicExchange) {
            this.topicExchange = topicExchange;
        }
    
        /**
         * 有绑定Key的Exchange发送
         * @param routingKey
         * @param msg
         */
        public void sendMessage(String routingKey, Object msg){
            rabbitTemplate.setExchange(topicExchange.getName());
            rabbitTemplate.setRoutingKey(routingKey);
            //send方法复杂不会序列化
            rabbitTemplate.convertAndSend(routingKey,msg);
        }
    
        /**
         * 没有绑定KEY的Exchange发送
         * @param exchange
         * @param msg
         */
        public void sendMessage( AbstractExchange exchange, String msg){
            rabbitAdmin.declareExchange(exchange);
            rabbitTemplate.setExchange(exchange.getName());
            logger.info("RabbitMQ send "+exchange.getName()+"->"+msg);
            rabbitTemplate.send(new Message(msg.getBytes(), new MessageProperties()));
        }
            
        
    }
    

    动态订阅:
    1.注入Bean

      @Bean
        public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory){
            return new SimpleMessageListenerContainer(connectionFactory);
        }
    
    

    2.自定义接收类

    /**
     * 自定义消息接收类
     * @author lc
     */
    public class Receive {
    
        public void handleMessage(String message){
            System.err.println("handleMessage: "+ message);
        }
    
        public void process(String msg){
            System.err.println("Process: "+ msg);
        }
    }
    

    3.定义动态订阅

    /**
     * 动态订阅
     */
    @Component
    @Order(10)
    public class MyApplicationRunner implements CommandLineRunner {
    
        @Override
        public void run(String... args) throws Exception {
            SimpleMessageListenerContainer container = SpringUtil.getBean(SimpleMessageListenerContainer.class);
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new Receive());
            messageListenerAdapter.addQueueOrTagToMethodName("QueueName","handleMessage");
            messageListenerAdapter.addQueueOrTagToMethodName("lcTest","process");
            String[] strings = {"QueueName","lcTest"};
            container.addQueueNames(strings);
            container.setMessageListener(messageListenerAdapter);
        }
    }
    

    后端的话暂时就用到这些,后面再有新的使用会持续更新。前端还用到了RabbitMq的webstomp插件,直接将消息推给前端。

    代码: https://gitee.com/leichencode/rabbitmqDemo.git

    相关文章

      网友评论

          本文标题:SpringBoot + RabbitMQ 应用

          本文链接:https://www.haomeiwen.com/subject/vtjwmftx.html