本文讲述springboot与rabbitMQ的整合
- 如果你还没接触过springboot或MQ,那么请看一下我的两篇前置教程,搭建基础框架与MQ的服务
- springboot https://www.jianshu.com/p/8ade437792cc
- rabbitMQ https://www.jianshu.com/p/60c358235705
- 流程 - 角色分为生产者->MQ->消费者->MQ
主要步骤为- 生产者发送需要处理的消息给MQ
- MQ推送消息给消费者
- 消费者使用消息,然后反馈给MQ
- pom.xml文件里添加MQ的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 生产者
// RabbitMQService接口只有一个空的addQueue方法,这里不贴出来了
// 本类是生产消息类
@Service
@Transactional
public class RabbitMQServiceImpl implements RabbitMQService {
// 在properties里加入对应的queue的名字
@Value("${rabbitmq.queue}")
private String queue;
@Autowired
public RabbitTemplate rabbitTemplate;
@Override
public void addQueue(Object obj) {
// 像指定queue推送
rabbitTemplate.convertAndSend(queue, obj);
}
}
// 本类是生产者的MQ配置类
@Configuration
public class RabbitMQConfig {
// 以下@Value里的key都需要放入properties内
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private String port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(Integer.valueOf(port));
factory.setPublisherConfirms(true);
return factory;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// 必须是prototype类型
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
}
- 消费者
//消费者的MQ配置类
@Configuration
@EnableRabbit
public class RabbitMQConfig {
// 以下@Value里的key都需要放入properties内
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private String port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
factory.setPublisherConfirms(true);
return factory;
}
/**
* 对于消费端,我们可以只创建 SimpleRabbitListenerContainerFactory,
* 它能够帮我们生成 RabbitListenerContainer,然后我们再使用
* @RabbitListener 指定接收者收到信息时处理的方法。
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
// 消费者处理MQ消息并反馈
@Component
public class RabbitMQListener {
@Autowired
private RabbitMQService rabbitMQService;
/**
* 写入你的queue的名字
* @param obj
* @param deliveryTag
* @param channel
*/
@RabbitListener(queues = "YOUR_RABBITMQ_QUEUE")
public void planTaskListener(@Payload Object obj, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
rabbitMQService.handlePlan(obj, deliveryTag, channel);
}
}
// service 处理
@Override
public void handlePlan(Object obj, long deliveryTag, Channel channel) throws IOException {
// do something with obj
// 反馈 通知消费成功
channel.basicAck(deliveryTag, false);
} catch (SchedulerException e) {
// 反馈 通知未消费成功
channel.basicNack(deliveryTag, false, true);
}
}
网友评论