简单介绍一下RabbitMQ安装和使用,这里使用docker安装RabbitMQ3.7版本。
docker安装rabbitmq
- 下载rabbitmq镜像,官方rabbitmq有很多版本,这里推荐下载带management的版本,带管理界面
>docker pull rabbitmq:3.7-management
- 运行rabbitmq
查看镜像
>docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq 3.7-management c497955c219a 12 days ago 180MB
redis 5.0.9 5120d23bad51 2 weeks ago 98.3MB
containerize/elastichd latest c2202f76db37 2 years ago 28.1MB
mobz/elasticsearch-head 5 b19a5c98e43b 3 years ago 824MB
可以看到rabbitmq的镜像id是c497955c219a
,我们使用镜像Id安装
> docker run --name rabbitmq-3.7 -d -p 15672:15672 -p 5672:5672 --restart=always c497955c219a
可以看到很快就安装好了
> docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
34d6319d5720 c497955c219a "docker-entrypoint.s…" 21 minutes ago Up 21 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq-3.7
-
访问http://主机IP:15672,就可以看到rabbitmq管理界面了,默认用户名和密码都是guest
rabbitmq管理界面
rabbitmq使用
1. 在使用rabbitmq之前,首先得对AMQP协议有所了解。
-
AMQP协议模型
AMQP协议模型 -
AMQP消息流转过程
消息流转过程
Publisher:消息发送者,将消息发送到Exchange并指明Routing Key,以便Message Queue可以正确的收到消息
Consumer:消息接受者,从Message Queue获取消息,一个Consumer可以订阅多个Queue, 来接受Queue中的消息
Server: 一个具体的MQ服务实例
Virtual host: 虚拟主机,一个server下面可以有多个虚拟主机,通常用于隔离不同的项目,一个Virtual host下面通常会有多个Exchange、Message Queue
Exchange:交换器,从Producer接受消息, 根据Bindings中配置的Routing key, 把消息分派到对应的Message Queue中
Routing key:路由键,用于Exchange判断哪些消息需要发送对应的Message Queue
Bindings: 描述了Exchange和Queue之间的关系。Exchange 根据消息内容 (routing key),和Binding配置来决定把消息分派到哪个Queue中
Message Queue: 存储消息, 并把消息传递给最终的 Consumer
2. 创建exchange和queue
首先需要创建一个exchange和queue才能收发消息,直接登录到管理界面进入Exchanges菜单(这里创建一个type为topic的exchange)
image.png
- Name是Exchage的名字
- Type是Exchage的Routingkey的绑定类型(direct,topic,fanout,header)
- Durability:消息是否持久化
- Auto delete:如果设置为yes则当exchange最后一个绑定的队列被删除后,就会自动删除
- Internal:如果设置为yes,是RabbitMQ的内部使用,不提供给外部,自己编写erlang语言做扩展时使用
- Arguments:扩展AMQP的自定义参数
再创建一个queue
创建queue
新建routingkey,绑定exchange和queue的关系
绑定关系
3. 操作rabbitmq
使用springboot操作rabbitmq非常简单,首先加入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置rabbitmq地址
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
发送消息,可以直接使用RabbitTemplate发送消息。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test0(){
//传入exchange 和 routingkey 就可以投递消息到绑定的队列,消息可以发字符串也可以发序列化对象
rabbitTemplate.convertAndSend("order-exchange","order.update","hello rabbit mq");
}
接收消息也非常简单,添加listener配置。
spring:
rabbitmq:
host: 39.99.219.219
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
listener: #消费端配置
simple:
concurrency: 5 #初始化并发数
max-concurrency: 10 #最大并发数据
auto-startup: true #自动开启监听
prefetch: 1 #每个并发连接同一时间最多处理几个消息,限流设置
acknowledge-mode: manual #签收模式,设置为手动
接收消息只需要在方法上加@RabbitListener和@RabbitHandler注解。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.update"
)
)
@RabbitHandler
public void onOrderMessage(@Payload String msg, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("msg: "+msg);
}
4. 消息接收的应答模式ACK和NACK的使用
为了保证消息可以正确被消费MQ提供了ACK和NACK的机制。
- ACK是手动签收消息的标识,如果消息的签收模式设置成为了手工模式,在MQ没有接收到ACK信息时都是Unacked的状态,并且消息还在队列中,这个时候消息不会重试,不会再主动发给消费者
- NACK:将消息重回队列,如果我们发现异常,就可以调用NACK来将消息重回队列,他会重回到队尾重新发给消费者
可以看到上面的例子打印出了队列信息但却无法把这个队列消费掉,当你重新启动的时候又会读到之前未消费完的消息。
未消费完的消息
可以看一下下面的例子,每个消息进来重试三次,第四次才签收完成。
int flag = 1;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.update"
)
)
@RabbitHandler
public void onListenMessage(@Payload String msg, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("msg: "+msg);
Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//ACK进行签收,第一个参数是标识,第二个参数是批量接收为fasle
//channel.basicAck(deliverTag,false);
if(flag>3){
//说明执行了3次都没有成功
//消息确认
channel.basicAck(deliverTag,false);
}else {
flag = flag+1;
//前两个参数和上面ACK一样,第三个参数是否重回队列
channel.basicNack(deliverTag, false, true);
}
}
}
************Select消息接收开始***********
mq msg: hello rabbit mq
************Select消息接收开始***********
mq msg: hello rabbit mq
************Select消息接收开始***********
mq msg: hello rabbit mq
************Select消息接收开始***********
mq msg: hello rabbit mq
5. 发送确认和发送失败退回
为了保证消息能一定投递成功,AMQP提供了消息确认和消息失败退回。
- 先加入配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
#开启发送确认
publisher-confirms: true
#开启发送失败退回
publisher-returns: true
2.发送消息的时候添加一个消息的唯一标识,在消息回调的时候可以通过唯一标识判断消息投递是否成功
@Test
public void test01(){
CorrelationData correlationData = new CorrelationData();
correlationData.setId("ms0001");
rabbitTemplate.convertAndSend("order-exchange","order.select","this is a msg",correlationData);
}
- 创建一个类并实现RabbitTemplate.ConfirmCallback和 RabbitTemplate.ReturnCallback接口,来接收消息投递回调
@Component
public class OrderSenderCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 每次发送完消息都会调这个方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("==========消息publish成功的ID: " + (correlationData != null ? correlationData.getId() : "null"));
System.out.println("==========消息是否发布成功: " + ack);
System.out.println("==========失败的异常信息: " + cause);
}
/**
* 每次消息发送失败会调用这个方法
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("******replayCode: " + replyCode);
System.out.println("******replayText: " + replyText);
System.out.println("******exchange: " + exchange);
System.out.println("******routingkey: " + routingKey);
}
}
消息队列高可用方案
1. 生产端的可靠性投递
假如用MQ所做的业务不允许消息丢失,那么生产端就要保证消息能够百分之百投递,消费端就要保证消息百分之百被消费。那么该怎么做呢?
- 消费端需要开启手动应答,用ACK和NACK的机制来确保消息的消费应答
- 生产者发送消息后一定要有一个确认应答来确认消息的发送状态
- RabbitMQ本身需要做HA高可用
- 做一个完善的消息补偿机制
- RabbitMQHA高可用方案
镜像模式和普通模式的区别就是,队列的数据都镜像了一份到所有的节点上。这样任何一个节点失效,不会影响整个集群的使用。
- 消息补偿机制
- 一开始要有一个业务DB和MQDB,将业务数据存入DB
- 业务落库后就通过Sender发送消息给Broker,发送消息后就将消息体和消息状态记录到MQDB中(发送中,发送完成),并且会接收到Broker返回的消息投递确认状态(需要代码支持)
- 消息发送方接收Broker返回的消息和网络状态的异常
- 如果消费发送确认失败应该调用业务来将MQDB消息的状态更新为发送失败,如果发送状态成功就将MQDB中的记录更新为成功
- 使用分布式定时Job来对我们的消息进行判断,只要是消息状态不是我们发送成功的都需要进行重新发送
- 重试一定要有次数限制,当达到现在的次数就不再重试通知人工处理
2. 消费端幂等性问题解决
为了确保发送端的投递成功,我们都会进行消息的补偿,有可能消息补偿的过程中会多发送几次消息导致重复,这个时候就需要提前考虑消费端的幂等问题。
-
唯一ID+业务码在数据库中做主键来做去重
- 优点:实现简单方便
- 缺点:会对数据库产生异常压力,并且只能用来做insert的幂等
-
为执行的内容做前置条件,类似于mysql的乐观锁
给数据更新前增加一个前置的条件,需要将拿到的前置条件做为更新的条件之一来做操作,如果在你之前已经更新了这个前置条件,那么你的更新就会不成功。这样同样也会对数据库产生一定的压力 -
利用Redis原子性
第一个进入并拿到锁的线程在锁内部先判断是否已消费,如果没有消费则操作并记录这个Measge_ID已消费,并发过程中的线程如果拿不到锁就直接返回,这个线程能拿到锁,但拿到后要去查询这个Measge_ID是否消费过,如果已经消费了,就中止消费
3.消费端的消息可靠性保障
如果生产端能够保证消息的可靠性那么消费端就一定能收到消息,需要注意的有两点
- 开启消息手动签收,如果消费成功就手动发送ACK响应,表明这个消息消费成功
- 出现异常可以通过NACK将消息重回队尾变成Ready状态然后再次消费
- NACK要有重试次数限制,当超过次数就将这个消息发送到一个人工补偿消息队列或持久化对象中等待人工补偿
网友评论