Rabbitmq
简介
RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为 面向消息的中间件)。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。
MQ的理解
可以将消息队列理解成一个书柜,provider-小红,consumer-小明,小红把小明要看的书,一本一本陆陆续续地放到书柜上,小明有空就去拿来看。
RabbitMq的重要概念
- Broker :经纪人。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。
- Exchange :消息交换机。指定消息按照什么规则路由到哪个队列Queue。(可以理解成一个书柜)
- Queue :消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
- Binding :绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
- RoutingKey :路由关键字。Exchange根据RoutingKey进行消息投递。
- Vhost :虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。
- Producer :消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
- Consumer :消息消费者。消息的接收者,一般是独立的程序。
- Channel :消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。
RabbitMq的消息处理流程
在rabbit mq中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式投递到相应的Queue上,Queue又将消息发送给已经在此Queue上注册的consumer。
消息队列的使用过程大概如下:
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange(转发消息,不做储存),并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好Binding关系。
- 生产者客户端投递消息到exchange。
- exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。
- 消费者客户端从对应的队列中获取并处理消息。
以上流程的理解,provider-小红,consumer-小明,书柜-Queue,channel-地点,消息-书
小红(provider)将书(msg)放到某个地方(channel)的某个书柜(Queue),并和小明(consumer)规定是哪个第几排(RoutingKey),小红陆陆续续放上去,小明根据和小红约定的具体地点+书柜+第几排去拿书
RabbitMq的l路由类型
Direct(点对点) Exchange
- 名称:直接交换器类型(点对点)
- 默认的预先定义exchange名字:空字符串或者amq.direct
-
作用描述:根据Binding指定的Routing Key,将符合Key的消息发送到Binding的Queue。可以构建点对点消息传输模型。
image.png
Fanout Exchange
- 名称:广播式交换器类型
- 默认的预先定义exchange名字:amq.fanout
-
作用描述:将同一个message发送到所有同该Exchange 绑定的queue。不论RoutingKey是什么,这条消息都会被投递到所有与此Exchange绑定的queue中。
image.png
Topic(订阅) Exchange
- 名称:主题交换器类型
- 默认的预先定义exchange名字:amq.topic
-
作用描述:根据Binding指定的RoutingKey,Exchange对key进行模式匹配后投递到相应的Queue,模式匹配时符号“#”匹配一个或多个词,符号“*”匹配正好一个词,而且单词与单词之间必须要用“.”符号进行分隔。此模式可以用来支持经典的发布/订阅消息传输模型-使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的queue。
image.png
Headers Exchange
- 名称:标题交换器类型
- 默认的预先定义exchange名字:amq.match和amq.headers
- 作用描述:同direct exchange类似,不同之处是不再使用Routing Key路由,而是使用headers(Message attributes)进行匹配路由到指定Queue。
- Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成HashTable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。
开发与使用
rabbitMq目前与springboot已经融合的非常好了,springboot可以直接采用自动化配置连接来访问rabbitMq,同时springboot直接支持rabbitMq的集群,不用像以前一样要单独部署一个HAProxy来作为rabbitMq的负载均衡。所以在开发项目中如果需要使用到RabbitMq,那么我们建议全部用springboot框架来开发。
开发配置
RabbitMq有许多配置,大部分配置都可以采用默认的方式,这里不一一叙述,但是在我们日常开发中,为了保证消息投递的高可用,我们会对一些特殊的配置进行设置。
基本配置
基本配置中我们需要在springboot项目中添加rabbitmq的依赖与mq的访问主机。
pom.xml 中添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.properties中配置rabbitmq的主机地址、端口、用户名、密码、虚拟主机等信息
spring.rabbitmq.host=192.168.1.45
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/gzjkDev
当然,如果是集群部署的话,配置则是这样,多个IP用逗号分开:
spring.rabbitmq.addresses=192.168.1.45:5672,192.168.1.46:5672,192.168.1.47:5672,
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/gzjkDev
发送确认模式配置
发送确认模式是开发中比较重要的一个细节,在使用RabbitMQ的时候,我们会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?
RabbitMQ为我们提供了两种方式:
- 方式一:通过AMQP事务机制实现,这也是从AMQP协议层面提供的解决方案;
使用事务的方式固然好,但是从发送到broker投递到相应的exchange并持久化消息的时候,整个流程生产者都在等待,这样就大大降低的性能,有人做过测试,rabbitmq单机不使用事务情况下投递1000条消息需要89毫秒,使用事务的话投递1000条消息需要58244毫秒!所以在日常的开发中,我们是绝对不会推荐使用事务的方式投递消息的,这里也不再对事务模式做更多的代码实例,有兴趣的同事可以自己百度一下。
- 通过启动消息确认模式(无性能损失);
发送消息确认,顾名思义就是生产者发送消息到MQ后,之后MQ主动告知生产者发送结果,生产者再对结果做相应的处理。
开启rabbitMq的消息发送确认模式配置:
spring.rabbitmq.publisher-confirms=true
消费确认模式也是比较重要的一项配置,默认情况下,springboot是没有开启消息消费确认模式的,当消费者获取到从MQ推过来的一个消息时,MQ中就会将该消息从相应的队列中移除,那么就会带来一个问题:
消费者通常在获取到MQ的消息时,会针对该消息做后续的一些消费逻辑处理,如果在这个过程中消费者出现异常,中途退出了,那么这条消息就没有被成功的消费,更糟糕的是,这条消息同时也被MQ从队列里删除了,当消费者服务恢复时,却永远都无法再次重新消费这条消息了!
开启消费确认模式配置:
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
代码示例
定义消息队列:
@Configuration
public class RabbitMQConfig {
public final static String QUEUE_NAME = "test_queue";
// 创建队列
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}
}
生产者代码:
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public Object sendMessage() {
for (int i = 0; i < 100; i++) {
String value = i + "消息";
System.out.println(value);
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, value);
}
return "ok";
}
}
生产者发送消息监听确认
@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息唯一标识:"+correlationData);
System.out.println("确认结果:"+ack);
System.out.println("失败原因:"+cause);
}
}
消费者代码
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public class Consumer {
@RabbitHandler
public void consumeMessage(String messageChannel, Channel channel, Message message) throws IOException {
System.out.println("HelloReceiver收到 : " + messageChannel + "收到时间" + new Date());
try {
//必须发送ack动作告诉RabbitMq你已成功消费消息,这么做的目的在于消费者服务在未消费完服务却宕机,重启时还可以继续消费上一个未消费完成的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("receiver success");
} catch (Exception e) {
e.printStackTrace();
System.out.println("receiver fail");
}
}
}
开发注意事项
消息幂等
消息幂等即保证生产者发送消息的数量要等于消费者消费消息的数量,这个适用于大多数的消息应用场景,尤其是对于一些与第三方系统对接消息,为了保证消息的幂等,除了配置消息发送确认模式和消费确认模式的同时,我们可以在生产者发送消息时记录日志,消费消息时也保留记录,这样可以在出现消息不匹配时有数可循。
网友评论