发送消息
发送消息的API参考:https://www.jianshu.com/p/a2bc82f33e4e
消费消息
RabbitMQ的消费模式分两种∶推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume 进行消费,而拉模式则是调用 Basic.Get 进行消费。
推模式
在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有∶
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
接收消息一般通过实现Consumer 接口或者继承 DefaultConsumer类来实现。当调用与 Consumer 相关的 API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个 Channel中的消费者也需要通过唯一的消费者标签以作区分。
消费消息的核心代码:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
logger.info("recv message: " + new String(body));
// ack操作
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// autoAck设置false
channel.basicConsume(QUEUE_NAME, false, consumer);
注意,上面代码中显式地设置 autoAck为 false,然后在接收到消息之后进行显式 ack 操作(channel.basicAck)对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。
除了重写handleDelivery方法外,还有
void handleConsumeOk(String consumerTag);
void handleDelivery();
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag , ShutdownSignalException sig) ;
void handleRecoverOk(String consumerTag);
上面方法的顺序就是RabbitMQ的调用顺序
拉模式
通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetRespone。Channel 类的 basicGet 方法没有其他重载方法,只有
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
queue是队列名称,如果设置autoAck是false那么同样需要调用channel.basicAck来确认消息已经接收。
推模式和拉模式的区别
推模式下RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos的限制
拉模式下消费者主动单条地获取消息
建议
如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费.但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ的性能.如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。
推模式过程
生产者

消费者

拉模式过程

网友评论