优先级处理
提示:消费者的消费速度大于生产者的生产速度,且Broker无消息积压时,对发送消息设置优先级没有卵用
- 设置queue的最大优先级(代码以SpringBoot下为例)
@Bean
public Queue queue1(){
//设置最大优先级 为 【10】
Map<String,Object> map = new HashMap<>();
map.put("x-max-priority",10);
return new Queue("queue1",true,true,false,map);
}
- 设置消息的优先级
this.rabbitTemplate.convertAndSend("fanoutExchange1","queue1",context,(message) -> {
message.getMessageProperties().setPriority(5);
return message;
});
RPC实现
image个人感觉实际用处不大 ,纯属了解即可
- 当客户端启动时,创建一个匿名的回调队列
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
- 客户端的RPC请求设置两个属性:
replyTo
用来告诉RPC服务端回复请求时的目的队列,即回调队列;CorrelationId
用来标记请求 - 请求被放入
rpc_queue
队列中 - RPC服务端监听
rpc_queue
队列的请求,当请求来到,服务端进行处理并把带有结果的消息发送给replyTo回调队列 - 客户端监听回调队列,当有消息时,先检查
CorrelationId
属性,如果和请求一致,即可
this.rabbitTemplate.convertAndSend(" ","rpc_queue",context,(message) -> {
//设置方法
message.getMessageProperties().setCorrelationId("test");
return message;
});
网友评论