RabbitMQ作为一款能实现高性能存储分发消息的分布式中间件,具有异步通信、服务解耦、接口限流、消息分发和业务延迟处理等功能,在实际生产环境中具有很广泛的应用。
为了能在项目中使用RabbitMQ,需要在本地安装RabbitMQ并能进行简单的使用。可参考改教程安装RabbitMQ:安装教程
一、总的代码目录结构如下:
rabbitmq使用代码目录结构.png- entity包实体类order,作为消息载体;
- ackmodel包是写消息确认消费机制,自动确认消费和手工确认消费;
- delay包是写延迟队列,在业务延迟处理时可用到,如订单30分钟内未付款自动取消订单相关业务;
- exchangemodel包是写不同类型交换机下的消息模型,常见的三种消息模型是:direct-直接传输、fanout-广播、topic-主题消息模型;
- springevent包是spring事件驱动模型的demo,要了解消息队列,先了解一下spring的事件驱动会更好;
- 数据库是用mysql,项目启动需要连接本地数据库,新建一个exercisegroup数据库;
- appilication.yaml配置rabbitmq地址,pom文件引入amqp starter即可。
二、spring事件驱动
(1)spring事件驱动模型由三部分构成,生产者、事件(消息)、消费者,即生成者采用异步的方式把事件(消息)发送给消费者,消费者监听到事件(消息)再进一步处理。
spring事件驱动.jpeg
(2)代码目录结构如下:
springevent.png
(3)示例代码
- OrderEvent类,订单事件,继承ApplicationEvent,:
public class OrderEvent extends ApplicationEvent {
public OrderEvent(Order source) {
super(source);
}
}
- OrderPublisher类,生产者,异步发送事件:
@Component
public class OrderPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void sendMsg() {
Order order = new Order();
order.setOrdernum("123456");
OrderEvent orderEvent = new OrderEvent(order);
//发送消息
applicationEventPublisher.publishEvent(orderEvent);
}
}
- OrderConsumer类,消费者,监听生产者发送过来的事件
@Component//加入Spring的IOC容器
@EnableAsync//允许异步执行
@Slf4j
public class OrderConsumer implements ApplicationListener<OrderEvent> {
@Override
@Async
public void onApplicationEvent(OrderEvent event) {
log.info("监听到订单,订单号:{}", ((Order) event.getSource()).getOrdernum());
}
}
(4)执行MessageQueueApplicationTests的test方法即可看结果,RabbitMQ本质也是异步通信,消息在生产者端进行发送,在消费者端进行监听,对监听到的消息进一步处理,其功能更加强大。
test-springevent.png
三、RabbitMQ一些专有名词
Producer/Publisher生产者,投递消息的一方。
Consumer消费者,接收消息的一方。
Message消息:实际的数据,如demo中的order订单消息载体。
Queue队列:是RabbitMQ的内部对象,用于存储消息,最终将消息传输到消费者。
Exchange交换机:在RabbitMQ中,生产者发送消息到交换机,由交换机将消息路由到一个或者多个队列中
RoutingKey路由键:生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。
Binding绑定:RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列。
四、RabbitMQ使用
(1)简单的demo
消息队列流程图.jpeg 简单的demo.png
(2)示例代码
- RabbitmqConfig配置类,SimpleRabbitListenerContainerFactory Bean是消息监听容器,服务于监听者;RabbitTemplate是RabbitMQ发送消息的操作组件RabbitTemplate,此外配置类还有三个Bean,一个是队列basicQueue用于存储消息最终消息会被消费者监听到,basicExchange是交换机,生产者发送消息到交换机根据路由规则发送到相应的队列basicQueue上,basicBinding是负责绑定交换机basicExchange和队列basicQueue,根据路由规则绑定起来。创建队列、交换机的名词以及路由规则我都放到常量类RabbitMqConstants里面。
@Slf4j
@Configuration
public class RabbitmqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
//自动装配消息监听器所在的容器工厂配置类实例
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 下面为单一消费者实例的配置
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
//定义消息监听器所在的容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置容器工厂所用的实例
factory.setConnectionFactory(connectionFactory);
//设置消息在传输中的格式,在这里采用JSON的格式进行传输
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//设置并发消费者实例的初始数量。在这里为1个
factory.setConcurrentConsumers(1);
//设置并发消费者实例的最大数量。在这里为1个
factory.setMaxConcurrentConsumers(1);
//设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
factory.setPrefetchCount(1);
return factory;
}
//自定义配置RabbitMQ发送消息的操作组件RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(){
//设置“发送消息后进行确认”
connectionFactory.setPublisherConfirms(true);
//设置“发送消息后返回确认信息”
connectionFactory.setPublisherReturns(true);
//构造发送消息组件实例对象
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
//发送消息后,如果发送失败,则输出“消息发送失败-消息丢失”的反馈信息
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
//定义消息传输的格式为JSON字符串格式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//最终返回RabbitMQ的操作组件实例RabbitTemplate
return rabbitTemplate;
}
//创建队列
@Bean
public Queue basicQueue(){
return new Queue(RabbitMqConstants.BASIC_QUEUE,true);
}
//创建交换机:在这里以DirectExchange为例
@Bean
public DirectExchange basicExchange(){
return new DirectExchange(RabbitMqConstants.BASIC_EXCHANGE,true,false);
}
//创建绑定
@Bean
public Binding basicBinding(){
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RabbitMqConstants.BASICE_ROUTING_KEY);
}
}
- RabbitMqConstants常量类,存放创建队列、交换机的名词以及路由规则。
@Data
public class RabbitMqConstants {
//队列名词
public static final String BASIC_QUEUE = "mq.basic.info.queue";
//交换机名词
public static final String BASIC_EXCHANGE = "mq.basic.info.exchange";
//路由规则,实际为字符串
public static final String BASICE_ROUTING_KEY = "mq.basic.info.routing.key";
}
- BasicPublisher 类,生产者,异步发送消息
@Component
@Slf4j
public class BasicPublisher {
//定义RabbitMQ消息操作组件RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param message 待发送的消息
*/
public void sendMsg(Order message){
try {
//指定消息模型中的交换机
rabbitTemplate.setExchange(RabbitMqConstants.BASIC_EXCHANGE);
//指定消息模型中的路由
rabbitTemplate.setRoutingKey(RabbitMqConstants.BASICE_ROUTING_KEY);
//转化并发送消息
rabbitTemplate.convertAndSend(message);
log.info("rabbitmq demo-生产者-发送消息:{} ", JSONUtil.toJsonStr(message));
} catch (Exception e) {
log.error("rabbitmq demo-生产者-发送消息发生异常:{} ", message, e.fillInStackTrace());
}
}
}
- BasicConsumer 类,消费者,监听到消息时对消息进行处理,需要为消费者设置监听的队列mq.basic.info.queue以及监听容器singleListenerContainer。
@Component
@Slf4j
public class BasicConsumer {
/**
* 监听并接收消费队列中的消息-在这里采用单一容器工厂实例即可
*/
@RabbitListener(queues = RabbitMqConstants.BASIC_QUEUE, containerFactory = "singleListenerContainer")
public void consumeMsg(Order message) {
try {
log.info("rabbitmq demo-消费者-监听消费到消息:{} ", JSONUtil.toJsonStr(message));
} catch (Exception e) {
log.error("rabbitmq demo-消费者-发生异常:", e.fillInStackTrace());
}
}
}
(3)安装好erlang语言以及rabbitmq之后,项目启动,访问http://127.0.0.1:15672,输入默认账号密码,可以看到:
消息队列.png 交换机.png 交换机绑定关系.png
(4)运行test方法:
@Test
public void testBasicPublish() {
Order order = new Order();
order.setOrdernum("123456");
basicPublisher.sendMsg(order);
}
生产者发送消息.png
消费者监听消息.png
下一篇:springboot rabbitmq不同交换机类型实战
下一篇:springboot rabbitmq高可用消息确认消费实战
参考资料:
《分布式中间件实战》
《rabbitmq实战指南》
网友评论