1 消息持久化
1.1 exchange持久化
/*
* 声明消息队列,且为可持久化的
*
* EXCHANGE_NAME: 交换机名称(name)
* direct: 交换机类型(type)
* true: 是否持久化(durable)
* false: 是否自动删除(autoDelete)
* null: 其他参数(arguments)
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
1.2 queue持久化
/*
* 声明消息队列,且为可持久化的
*
* QUEUE_NAME: 队列名称(name)
* true: 是否持久化(durable)
* false: 是否为排他性队列(exclusive)
1: 只对首次声明它的连接(Connection)可见
1.1 首次声明: 因为另外一个连接无法声明一个同样的排他性队列
1.2 只区别连接(Connection)而不是通道(Channel),从同一个连接创建的不同的通道可以同时访问某一个排他性的队列
2: 会在其连接断开的时候自动删除
2.1 无论队列是否被声明成持久性的(Durable =true),只要调用连接的Close方法或者客户端程序退出,RabbitMQ都会删除这个队列
2.2 注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。
* false: 是否自动删除(autoDelete)
* null: 其他参数(arguments)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false,null);
1.3 消息持久化
// 消息持久化
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 1:不持久化 2:持久化
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
/*
* EXCHANGE_NAME: 交换机名称
* "" : 路由key
* properties: 基本属性
* message: 消息体
*/
channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes());
2 消费者ACK确认机制
2.1 自动确认
// 自动确认
channel.basicConsume(QUEUE_NAME, true, consumer);
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
2.2 手动确认
// 手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
// 向服务端确认消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
3 生产者AMQP事务机制(生产环境禁止使用)
// 开启事务
try {
String message = "I am simple_queue!";
// 开启事务
channel.txSelect();
// 往队列中发出一条消息,使用rabbitmq默认交换机
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事务
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
// 事务回滚
channel.txRollback();
}
4 生产者确认
try {
// 生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式
channel.confirmSelect();
// 消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID)
channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
//
if (!channel.waitForConfirms()){
System.out.println("message failed!");
// do something
}
}catch (Exception e){
e.printStackTrace();
}
网友评论