---实践是检验真理的唯一标准---
yml参数配置
这次我使用的是RabbitTemplate
rabbitmq:
host: 192.168.225.136
port: 5672
username: thinking
password: 123
virtual-host: host1
publisher-returns: true
# 事务模式下这行需要删除
publisher-confirm-type: correlated
template:
# 找不到路由规则的消息 是否保留
mandatory: true
为什么Template不需要定义configuration文件来接收yml文件的参数?
这是个常识问题,我这里做个记录。。。
我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。
springboot中何时加载Template,可以仔细看看自动装配注解:EnableAutoConfiguration
这类Template模板的初始化有个Properties文件,不如:
RabbitProperties
RedisProperties
方法中注解:ConfigurationProperties 指定了默认取得yml格式内容
至于具体的属性可以找set方法
我们的demo都是基于RabbitTemplate来写。。。
初始化数据
通过枚举ExchangeEnum、QueueEnum、BindingEnum动态维护和创建
1.初始化交换机
@Bean("createExchange")
public Object createExchange(RabbitAdmin rabbitAdmin) {
// 遍历交换机枚举
ExchangeEnum.toList().forEach(exchangeEnum -> {
// 根据交换机模式 生成不同的交换机
switch (exchangeEnum.getType()) {
case fanout:
rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
case topic:
rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
case direct:
rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
}
});
return null;
}
2.初始化队列
@Bean("createQueue")
public Object createQueue(RabbitAdmin rabbitAdmin) {
// 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
QueueEnum.toList().forEach(queueEnum -> {
rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
});
return null;
}
3.交换机和队列绑定
@Bean("createBinding")
public Object createBinding(RabbitAdmin rabbitAdmin) {
// 遍历队列枚举 将队列绑定到指定交换机
BindingEnum.toList().forEach(bindingEnum -> {
// 交换机
ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
// queue
QueueEnum queueEnum = bindingEnum.getQueueEnum();
// 绑定
rabbitAdmin.declareBinding(new Binding(
// queue名称
queueEnum.getName(),
Binding.DestinationType.QUEUE,
// exchange名称
exchangeEnum.getExchangeName(),
// queue的routingKey
queueEnum.getRoutingKey(),
// 绑定的参数
bindingEnum.getArguments()));
});
return null;
}
延迟队列
1.定义队列
/**
* 超时队列---不需要定义RabbitListener方法
*/
deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),
/**
* 超时接收队列
*/
reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){
// reply_to 队列
Map<String,Object> map = new HashMap<>();
//设置消息的过期时间 单位毫秒
map.put("x-message-ttl",10000);
//设置附带的死信交换机
map.put("x-dead-letter-exchange","reply_exchange");
//指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
map.put("x-dead-letter-routing-key","reply.queue");
return map;
}
2.定义交换机
/**
* 超时交换机
*/
deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),
/**
* 超时接收交换机
*/
reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),
3.交换机和队列绑定
deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),
reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)
4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener
@RabbitListener(queues = {"reply_queue"})
@RabbitHandler
public void reply_queue(Message message, Channel channel) throws Exception {
System.err.println("消费端-reply: " + new String(message.getBody(), "UTF-8"));
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
测试:
/**
* 延迟队列测试
*/
public void deal_queue_test() {
ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
// 消息
String message = "11111111111111111111111111111111111111";
MessageProperties messageProperties = getMessageProperties();
// 发送
rabbitTemplate.convertSendAndReceive(
exchangeEnum.getExchangeName(),
queueEnum.getRoutingKey(),
new Message(message.getBytes(), messageProperties));
}
异步队列
1.AsyncRabbitTemplate定义
/**
* 异步队列
* @param rabbitTemplate
* @return
*/
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.setReceiveTimeout(50000);
return asyncRabbitTemplate;
}
2.测试
public void async() {
System.err.println("---------------async--------------start---------");
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
// 配置下面代码时 如果 队列监听中没有返回值时会报错
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(Object result) {
System.out.println("回调收到结果=> " + result);
}
});
System.err.println("---------------async--------------end---------");
}
3.监听方法
@RabbitListener(queues = {"async_queue"})
@RabbitHandler
public Object async_queue(Message message, Channel channel) throws Exception {
System.err.println("消费端-async: " + new String(message.getBody(), "UTF-8"));
return "ok";
}
Java api
1.消息回退:
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
long deliveryTag:消息唯一标识,这是RabbitMQ自动生成的,不需要人为管理,只需要从message.getMessageProperties().getDeliveryTag() 就可以获得。
boolean multiple:是否批量退回,不开启就使用false,开启批量退回需要增加自己的业务判断逻辑(比如:攒够几条再批量回退,或者设置等待间隔等等)
boolean requeue:是否退回到消息队列,退回就使用true,就是交给其他消费者处理。
2.拒绝消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的
requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息
3.确认ack
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
4.创建一个队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列
5.启动一个消费者,并返回服务端生成的消费者标识
/**
* queue:队列名
* autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
* consumerTag:客户端生成的一个消费者标识
* nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
* exclusive: 如果是单个消费者,则为true
* arguments:消费的一组参数
* deliverCallback: 当一个消息发送过来后的回调接口
* cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
* shutdownSignalCallback: 当channel/connection 关闭后回调
*/
channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
6.取消消费者订阅
/**
* 取消消费者对队列的订阅关系
* consumerTag:服务器端生成的消费者标识
**/
void basicCancel(String consumerTag)
7.主动拉取队列中的一条消息
/**
* 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅
*/
GetResponse response = channel.basicGet(QUEUE_NAME, true);
System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));
参数介绍
1.队列参数
x-dead-letter-exchange 死信交换机
x-dead-letter-routing-key 死信消息重定向路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为255的队列优先排序功能
2.消息参数
content-type 消息体的MIME类型,如application/json
content-encoding 消息的编码类型
message-id 消息的唯一性标识,由应用进行设置
correlation-id 一般用做关联消息的message-id,常用于消息的响应
timestamp 消息的创建时刻,整形,精确到秒
网友评论