美文网首页
springboot + rabbitMQ

springboot + rabbitMQ

作者: MacSam | 来源:发表于2018-05-02 16:30 被阅读50次

    本文讲述springboot与rabbitMQ的整合

    • 如果你还没接触过springboot或MQ,那么请看一下我的两篇前置教程,搭建基础框架与MQ的服务
      1. springboot https://www.jianshu.com/p/8ade437792cc
      2. rabbitMQ https://www.jianshu.com/p/60c358235705
    • 流程 - 角色分为生产者->MQ->消费者->MQ
      主要步骤为
      1. 生产者发送需要处理的消息给MQ
      2. MQ推送消息给消费者
      3. 消费者使用消息,然后反馈给MQ
    • pom.xml文件里添加MQ的依赖
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    
    • 生产者
    // RabbitMQService接口只有一个空的addQueue方法,这里不贴出来了
    // 本类是生产消息类
    @Service
    @Transactional
    public class RabbitMQServiceImpl implements RabbitMQService {
        // 在properties里加入对应的queue的名字
        @Value("${rabbitmq.queue}")
        private String queue;
        @Autowired
        public RabbitTemplate rabbitTemplate;
    
        @Override
        public void addQueue(Object obj) {
            // 像指定queue推送
            rabbitTemplate.convertAndSend(queue, obj);
        }
    }
    
    // 本类是生产者的MQ配置类
    @Configuration
    public class RabbitMQConfig {
         // 以下@Value里的key都需要放入properties内
        @Value("${rabbitmq.host}")
        private String host;
        @Value("${rabbitmq.port}")
        private String port;
        @Value("${rabbitmq.username}")
        private String username;
        @Value("${rabbitmq.password}")
        private String password;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setHost(host);
            factory.setPort(Integer.valueOf(port));
            factory.setPublisherConfirms(true);
            return factory;
        }
    
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        // 必须是prototype类型
        public RabbitTemplate rabbitTemplate() {
            return new RabbitTemplate(connectionFactory());
        }
    }
    
    • 消费者
     //消费者的MQ配置类
    @Configuration
    @EnableRabbit
    public class RabbitMQConfig {
         
        // 以下@Value里的key都需要放入properties内
        @Value("${rabbitmq.host}")
        private String host;
        @Value("${rabbitmq.port}")
        private String port;
        @Value("${rabbitmq.username}")
        private String username;
        @Value("${rabbitmq.password}")
        private String password;
       
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUsername(username);
            factory.setPassword(password);
            factory.setHost(host);
            factory.setPort(port);
            factory.setPublisherConfirms(true);
            return factory;
        }
    
        /**
         * 对于消费端,我们可以只创建 SimpleRabbitListenerContainerFactory,
         * 它能够帮我们生成 RabbitListenerContainer,然后我们再使用
         * @RabbitListener 指定接收者收到信息时处理的方法。
         */
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory());
            factory.setConcurrentConsumers(3);
            factory.setMaxConcurrentConsumers(10);
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    }
    
    // 消费者处理MQ消息并反馈
    @Component
    public class RabbitMQListener {
    
        @Autowired
        private RabbitMQService rabbitMQService;
    
        /**
         *  写入你的queue的名字
         * @param obj
         * @param deliveryTag
         * @param channel
         */
        @RabbitListener(queues = "YOUR_RABBITMQ_QUEUE")
        public void planTaskListener(@Payload Object obj, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
            rabbitMQService.handlePlan(obj, deliveryTag, channel);
        }
    }
    
        // service 处理
        @Override
        public void handlePlan(Object obj, long deliveryTag, Channel channel) throws IOException {
                // do something with obj
                // 反馈 通知消费成功
                channel.basicAck(deliveryTag, false);
            } catch (SchedulerException e) {
                // 反馈 通知未消费成功
                channel.basicNack(deliveryTag, false, true);
            }
        }
    

    相关文章

      网友评论

          本文标题:springboot + rabbitMQ

          本文链接:https://www.haomeiwen.com/subject/axlcrftx.html