一、rabbitmq简介
基于amqp协议的,既然是协议,那么就定义了数据交换的格式,不是和jms一样是api规范,amqp是一个协议,所以可以跨平台,amqp有exchange和bind角色,这个跟jms不一样。jms有两种模式,点对点的queue和发布订阅的topic,amqp的模式主要在于exchange的类型,exchange有多种类型,主要使用的有,direct、topic、fanout exchange。 还有一个不同是,jms可以发送多种消息格式,amqp只能发送二进制。
Rabbitmq是Erlang语言开发的,所以性能方面比较好,消息发布到消费响应时间比较低,但是因为是erlang语言,所以看不懂源码,更不能定制化。
rabbitmq的管理界面功能比较丰富,可以看到很多状态
rabbitmq的大概流程:生产者发送消息,消息不会直接到达queue,而是发送给exchange,生产者发送了消息内容之外,还指定了routingKey,然后exchange根据这个routingkey,找到binding到这个exchange的队列,然后发送给这个队列。
这样的好处是:假设本来B服务监听A服务的消息,未来C服务也想监听A服务的消息,那么C服务可以重新创建一个queue,然后把这个queue绑定到A服务消息发送的那个exchange中,就能收到消息了。(这个跟activemq的topic机制没有什么不同)
但是,引入了exchange的机制的另一个好处是,如果A服务发送给C服务的消息,C服务处理不过来,产生大量积压,则可以再开启消费者,从queue里消费消息,而这个机制,是activemq的topic实现不了的。
二、exchage和queue的定义
RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性, 也就是说,exchange和queue一旦创建好了,就不能修改了,除非删掉重来。
比如,一定定义了一个非持久化的,要想改成持久化的,只能删掉重来。(可以通过管理界面删除)
channel.exchangeDeclare("testExchange", "direct", false);
autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
三、消息确认、持久化、消息拒绝
rabbitmq的消息确认、消息持久化的思想和activemq大体是一样的,只不过rabbitmq的持久化,有:
exchange的持久化:不持久化,则重启服务,exchange就消失了。
channel.exchangeDeclare("testExchange", "direct", true);
队列持久化:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
消息的持久化:
channel.basicPublish("testExchange", "routingkey1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消息确认和activemq一样:
// 每次从rabbitmq拉几条消息,也可以设置其他更多,反正如果没有应答,消费者重启后,还会发送
// 但是,如果一下拉取太多了,如果有多条消费者都从这个队列消费消息,平衡这个队列的压力的时候,某一个消费者拉去太多,会导致压力不均衡
channel.basicQos(1);
// 配置手动应答
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//应答需要指定Delivery Tag, 用来标识信道中投递的消息。RabbitMQ 推送消息
// 给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认
// 时告诉 RabbitMQ 到底是哪条消息被确认了。
//RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
//第二个参数:multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;
// 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认,也
// 就是批量确认之间所有的消息。
//对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。
channel.basicAck(envelope.getDeliveryTag(), false);
消息拒绝:
// deliveryTag:该消息的index
// requeue:被拒绝的是否重新入队列
// channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
四、消息发布:
消息发布,为了保证消息发布到rabbitmq成功以否,rabbitmq提供了回调确认机制,保证数据不丢失,当然事务也可以做到,但是性能不好,不常用。
// routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
// mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个
// 符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:
// 出现上述情形broker会直接将消息扔掉
// immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue
// 上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有
// queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
//BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化
// 1:持久化 这里指的是消息的持久化,配合
// exchange(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然 保留
// 单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将
// 消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消
// 费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返
// 还给生产者,不用将消息入队列等待消费者了。
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
四、exchange类型
type:主要有direct、fanout、topic三种,还有一些不常用的。
direct:直接找到routingkey对应的queue,生产者发送指定的routingkey要和queue绑定到exchange上的的routingkey完全相同,不能有通配符。
topic:和direct的区别是,routingkey可以有通配符。'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词
fanout: 不需要routingkey,直接把消息发送到所有绑定到exchange的queue中。
rabbitmq,queue绑定到bind之后,就不能改了。除非删掉,重新来过。
channel.exchangeDeclare("testExchange", "direct", false);
五、消费者线程
这部分,很容易导致,消费者的队列堵塞,导致消息消费不到。
可以指定消费者线程数。默认是单线程的。
每个消费者线程,轮询查看是否有消息,默认每1秒轮询一次。有消息,则调用我们定义的消费者。
关于消费者线程:
假设,有两个消费者,然后消费者线程定义为5,则总共有10个消费者线程。
服务一启动的时候,消费者的一个线程,就会一下拉取最大250个消息,所以一下把rabbitmq服务器的消息拉取完了。(其实不是拉取,是有一个线程往这个队列里循环放最大250个消息)
所以,这个消费者线程堵塞了,其他消息者线程也拿不到消息,所以也就没有并发执行。
而,服务启动后,生产者再发送一条消息,则会随机取一个线程执行,也就达到并发的效果。
spring-boot-starter-amqp部分源码分析
还有一种方式,手动把消息在消费端转到jdk的线程池。
spring:
rabbitmq:
listener:
simple:
concurrency: 1 #最小消息监听线程数
max-concurrency: 1 #最大消息监听线程数
acknowledge-mode: manual # 应答机制
retry:
####开启消费者重试
enabled: true
####最大重试次数(默认无数次)
max-attempts: 5
六、消费者重试
默认,重试3次,间隔为1秒。
这个错误,只有在,重试3次之后才会抛出来。
如果自动应答,则抛出错误后消息就丢失了。
如果有应答机制,则抛出错误后,消息就不再往这个消费者发了,除非另起一个一个消费者,后者消费者重启。
@RabbitListener(queues = "hello.queue1", containerFactory = "customContainerFactory")
public void processMessage1(String msg) {
System.out.println("thread name:" + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
throw new RuntimeException("错我");
// return msg.toUpperCase();
// return;
}
七、pull还是push模式
rabbitmq一般我们使用的是推模式,只有调用basicGet才会使用拉模式。
八、同步消息
rabbitmq,还可以同步消息,得到消费者的响应
rabbitTemplate.convertSendAndReceive
九、rabbitmq的流控机制
当发布者发布消息远大于消费者时,或者,rabbitmq本身的资源、性能处理不过来这些消息的时候,如果没有任何控制,则rabbitmq就会被压死。所以rabbitmq有流控机制。当达到流控阈值的时候,会堵塞发布者发布,直到消费者消费了一些消息,资源降下来了为止。
先看三个参数:
otal_memory_available_override_value:rabbitmq可以使用的内存值。默认,rabbitmq会自动推算出这个值,但是如果部署在docke上,则可能推算错误,推算到物理机的内存。
vm_memory_high_watermark:触发流控的阈值比例。默认时0.4。为什么时0.4这么小,是因为erlang语言垃圾回收最坏会占用已用内存的2倍。
vm_memory_high_watermark.relative = 0.6 相对
vm_memory_high_watermark.absolute = 2GB 绝对
当达到这个阈值之后,堵塞发布,等到,rabbitmq把内存中的数据,一部分持久化到硬盘,或者消费者消费了一部分,才放开堵塞。
vm_memory_high_watermark_paging_ratio:触发内存持久化到磁盘的阈值。
默认时0.5。比如,可用内存1g,内存阈值0.4,内存持久化磁盘0.5,则 1 * 0.4 * 0.5 = 0.2就会持久化磁盘。
除了宏观上流控之外,各个进程还有流控控制。
rabbitmq的各个进程之间还有credit flow流控的控制。消息会在各个进程之间流转,各个进程相互独立,如果某一个进程压力太大,将会导致整个rabbitmq压力大。所以进程会有credit flow控制,比如,进程来了一个消息,则credit - 1,如果credit为0了,则堵塞。
参考:https://mathspanda.github.io/2018/06/24/rabbitmq-flow-control/
防止消费者消息堵塞:
消息堵在消费者堵塞队列中,得不到执行。
解答办法:一定要basicAck或basicNack应答。basicQos不要设置太大,也就是堵塞队列,不要太长。
网友评论