rabbitMQ基础应用 java demo地址
消息状态:
- ready:消费者还能从队列中获取到消息的条数
- unacked: 消费者还没有返回确认的消息条数
- total: 队列中消息总数 total=ready+unacked
simple queue
- 默认exchange,队列名作为routingKey
work queue
- 多个消费者消费同一个队列
- 轮询分发Round-robin dispatching
- 1.消息队列中的消息/新产生的消息,轮询现有消费者,平均分配到每个消费者
- 2.如果确定其中某个消费者无法确认消息(ack),没确认的消息会变为ready,再次轮询分配到现有消费者
- 3.ready数量为0表示所有消息已指定消费者,无法再被其他消费者消费。即使有空闲的消费者,而某个消费者还有大量消息没有消费完也不会分配到空闲的消费者
- 4.total = ready + unacked
- 5.当autoAck = true时,消费者接收到消息就进行了ack,并不代表消费者完成了消费回调
- 6.当autoAck = false时,需要在消费回调中主动ack消息,一般在消费完成之后进行ack,表示消息消费完成
- 公平分发Fair dispatch
- 1.设置每次发送给消费者的消息数量qos=N,此时autoAck只能为false,也就是必须手动确认消息
- 2.每次消费 ready - N, unacked + N
- 4.N条消息全部确认,才会继续发送给该消费者下N条消息
- 5.ready不为0时,就不会出现空闲的消费者,解决了消费能力差的消费端消息堆积的问题
publish/subscribe
- fanout类型的exchange,会将所有发送到该exchange的消息发送到所有绑定的队列,跟routingKey无关
- 其中某一队列遵从work队列的轮询分发和公平分发
routing
- direct类型的exchange,队列绑定到交换机时需指定routingKey
topic
- 匹配字符
- * 星号匹配一个词
- # 井号匹配0个或多个词
- routingKey 可以通过通配符进行路由
rpc
- 生产者在发送消息的配置属性中,配置好correlation_id和reply_to队列
- 消费者接收到消息,处理好消息再将处理结果发送到reply_to队列,结果同时配置好correlation_id
- 生产者发送消息后同步等待reply_to队列,拿到消息匹配结果同时配置好correlation_id成功后,取出处理结果
消息确认机制与消息持久化
- 消费端消息应答
- 自动确认模式
boolean autoAck = true;
一旦rabbitmq把消息分发给消费者,就会从broker的内存中删除。
这种情况下如果杀死正在执行的消费者,就会丢失正在处理的消息。 - 消息应答模式
boolean autoAck = false;
如果有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答告诉rabbitmq这个消息我已经处理完成,可以删除,然后rabbitmq就删除内存中的消息 - 消息应答模式默认开启,即
boolean autoAck = false;
- 自动确认模式
- 生产者消息确认
- 事务机制
- client发送Tx.Select 开启事务
- broker发送Tx.Select-Ok(之后publish)
- client发送Tx.Commit 提交事务
- broker发送Tx.Commit-Ok
- 性能很差
- confirm模式
- publisher confirm模式并不是默认打开的,需要调用confirm.select方法将channel设置成confirm模式。当开启了confirm模式之后,只有当一条消息被所有的mirrors接受之后,publisher才会收到这条消息的confirm,也就是一个basic.ack方法。
- waitForConfirms 同步confirm,单个或批量确认消息
- addConfirmListener 异步confirm
- 性能好
- 事务机制
- 消息持久化
-
boolean durable = true;
开启持久化 -
boolean durable = false;
关闭持久化 - rabbitmq 不允许重新定义(不同参数)已存在的队列
-
部分api
- exchangeDeclare
- exchange,交换器名称
- type,交换器类型,如fanout,direct,topic
- durable,是否持久化,持久化可以将交换器存盘,在服务器重启后不会丢失相关信息
- autoDelete,是否自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此解绑,并不是与此连接的客户端都断开
- internal,是否是内置的,内置的交换器客户端无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
- argument 其他一些结构化参数
- queueDeclare
- queue,队列名称
- durable,是否持久化
- exclusive,是否排他,如果一个队列声明为排他,该队列仅对首声声明它的连接可见,并在连接断开后自动删除
- autoDelete,是否自动删除。自动删除前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才自动删除
- arguments,其他参数
- x-message-ttl 队列中消息过期时间
- x-max-priority 1.队列开启优先级,2.队列最大优先级
- x-expires 队列的过期时间,当队列未使用(没有任何消费者、没有被重新声明、过期时间段内未调用过Basic.Get命令)时,会被删除。服务器重启后,持久化的队列过期时间会重新计算,x-expires单位为毫秒,不能设置为0
- basicPublish
- exchange 交换器
- routingKey 路由键
- mandatory 告诉服务器至少将消息路由到一个队列中,否则将消息返回生产者(basic.return),为false则丢弃
- immediate 是否立即投递,告诉服务器,如果该消息关联的队列上有消费者,立即投递,没有消费者,则返回消息给生产者,不用将消息存入队列等待
- props
- expiration 单条消息过期时间(若x-message-ttl也设置,取最短的ttl。由队列属性设置的过期时间,消息到期后,会直接从队列中抹去;单独设置设置的,在投递前进行判断)
- priority 优先级
- basicConsume
- queue,队列名称
- autoAck,是否自动确认,建议为false,不自动确认
- callback,消费者的回调函数
网友评论