ReturnListener
mandatory
This flag tells the server how to react if the message cannot be routed to a queue.if this flag is set,the server will return an unroutable message with a return method.if this flag is zero,the server silently drops the message
如果mandatory有设置,则当消息不能路由到队列中去的时候,会触发return method。如果mandatory没有设置,则当消息不能路由到队列的时候,server会删除该消息。
immediate
This flag tell the server how to react if the message cannot be routed to a queue consumer immediately.if the flag is set ,the server will return an undeliverable message with a return method.if this flag is zero,the server will queue the message ,but with no guarantee that it will ever be consumed.
如果有设置immediate,则当消息路由到队列的时候,没有消费者的时候,会触发return method。如果immediate标识是0,则服务器就会将消息加入队列,但是不能保证这个消息能被消费。
注意
immediate属性在Rabbitmq3.x的时候,被废弃了。
因为这个关键字违背了生产者和消费者之间解耦的特性,因为生产者不关心消息是否被消费者消费掉,所以这个字段被drop掉了。
RabbitMQ java client 相关API
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Send {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
//或者
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("=========handleReturn===method============");
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("message:"+new String(body));
}
});
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
//第三个参数是设置的mandatory
channel.basicPublish("zhihao.direct.exchange","log.aaa",true,properties,"注册验证码".getBytes());
TimeUnit.SECONDS.sleep(10);
channel.close();
connection.close();
}
}
zhihao.direct.exchange的路由信息
因为路由不到正确的队列所以触发了ReturnListener方法的回调,控制台上打印:
replyText:NO_ROUTE
exchange:zhihao.direct.exchange
routingKey:log.aaa
message:注册验证码
spring-amqp的相关api
配置:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
factory.setPublisherReturns(true);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//rabbitTemplate.setMandatoryExpression(new SpelExpressionParser().parseExpression("(1+2) > 3")); //表达式的值为false
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey){
System.out.println("============returnedMessage==method=========");
System.out.println("replyCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey);
}
});
return rabbitTemplate;
}
}
发送消息
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc","消息发送");
messageProperties.getHeaders().put("token","234sdfsdf3r342dsfd1232");
//路由不到指定的队列
rabbitTemplate.convertAndSend("zhihao.direct.exchange","log.aaa","hello welcome");
TimeUnit.SECONDS.sleep(10);
context.close();
}
}
zhihao.direct.exchange的路由信息
rabbitTemplate.setReturnCallback的方法会被执行,控制台打印:
============returnedMessage==method=========
replyCode: 312
replyText: NO_ROUTE
exchange: zhihao.direct.exchange
routingKey: log.aaa
总结
- 设置factory.setPublisherReturns(true);
- rabbitTemplate.setMandatory(true);或rabbitTemplate.setMandatoryExpression(new SpelExpressionParser().parseExpression("(1+2) > 2")); //表达式的值为true
才会触发returnCallback回调方法的执行。
网友评论