正文
1.使用交换器和队列
1.定义交换器
Exchange.DeclareOk Channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
各个参数的含义:
exchange:交换器的名字
type:交换器的类型
durable:是否定义一个持久化的交换器
autoDelete:在交换器长时间未被使用的时候,服务器是否自动删除
internal:是否定义一个内部的交换器,如果交换器是内部的,那么它就不能被客户端直接发布
arguments:交换器的其他属性(构造参数)
返回值:声明确认方法,用于指示已成功声明了交换器
2.定义队列
Queue.DeclareOk Channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
各个参数的含义:
queue:队列的名字
durable:是否定义一个持久化的队列
exclusive:是否定义一个排他的队列(仅限于此连接)
autoDelete:是否定义一个自动删除的队列
arguments:队列的其他属性(构造参数)
返回值:声明确认方法,用于指示已成功声明了队列
3.绑定队列(绑定队列和交换器)
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
各个参数的含义:
queue:要绑定的队列名称
exchange:交换器名称
routingKey:路由键
arguments:其他属性(绑定参数)
返回值:声明确认方法,用于指示已成功绑定
注意要点:生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了,必须要先取消订阅,然后将信道置为“传输”模式,之后才能声明队列
4.清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;
queue:队列的名字
返回值:清除成功执行
5.发送消息
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOException;
各个参数的含义:
exchange:交换器的名字
routingKey:路由键
mandatory:
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;
immediate:
imandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
props:消息的基本属性集
其包含14个属性成员,分别有contentType、 contentEncoding、headers(Map<String,Object>)>、deliveryMode、priority、 correlationld、 replyTo、expiration、messageld、timestamp、type、userId、 appld、 clusterld
body:消息体
6.消费消息
推模式: String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
各个参数的含义:
queue:消息队列的名称
autoAck:为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
arguments:设置消费者的其他参数
deliverCallback:(消息传递时回调)设置消费者的回调参数。用来处理RabbitMQ推送过来的消息比如DefaultConsumer,使用时需要客户端重写其中的方法
cancelCallback:消费者被取消时回调
shutdownSignalCallback:当通道/连接关闭时回调
拉模式:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
注意:Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用Basic. Consume方法
7.拒绝消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
各个参数的含义:
deliveryTag:可以看作消息的编号,它是一个64位的长整型值,最大值是 9223372036854775807,如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue参数设置为false,则RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。
8.批量拒绝消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
各个参数的含义:
multiple参数 设置为false则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack和 basicReject方法一样;multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
注意:channel.basicReject 或者 channel.basicNack 中的requeue 设置为 false,可以启用“死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。
9.重新发送消息
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
这个channel.basicRecover方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。
网友评论