上一篇:初识RabbitMQ(maven版本):https://www.jianshu.com/p/568765c0b94b
1.导入依赖
![](https://img.haomeiwen.com/i20816836/74c81509bed28eba.png)
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2. 配置配置文件
spring:
application:
name: rabbitmq-boot
rabbitmq:
host: 47.105.198.54
port: 5672
virtual-host: /test-1
username: test
password: 123456
SpringBoot中使用RabbitTemplate
简化对RabbitMQ操作,使用时候直接在项目中注入即可使用。
3、使用
hello world模型使用
![](https://img.haomeiwen.com/i20816836/6d749774583fe627.png)
1、开发生产者
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void testHello(){
/**
* 参数一:队列名称
* 参数二:消息内容
*/
rabbitTemplate.convertAndSend("hello-boot","hello boot");
}
注:单独运行生产者并不会产生队列,队列是由消费者创建的
2、开发消费者
@Component
public class HelloCustomer {
// 通过注解创建 hello-boot 队列
@RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
public void consumptionStr(String message){
System.out.println("消费消息:" + message);
}
}
然后运行生产者
@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding
@Queue注解可以设置队列属性
work模型使用
![](https://img.haomeiwen.com/i20816836/ec8044e0c890b9e7.png)
1、开发生产者
@Test
void testWork(){
/**
* 参数一:队列名称
* 参数二:消息内容
*/
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work-boot","work boot");
}
}
2、开发消费者
@Component
public class WorkCustomer {
// 通过注解创建 work-boot 队列
@RabbitListener(queuesToDeclare = @Queue("work-boot"))
public void work1(String message){
System.out.println("消费消息-1:" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work-boot"))
public void work2(String message){
System.out.println("消费消息-2:" + message);
}
}
结果:
消费消息-1:work boot
消费消息-1:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-2:work boot
消费消息-1:work boot
消费消息-1:work boot
消费消息-1:work boot
说明:默认在Spring AMQP中实现Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
fanout模型使用
![](https://img.haomeiwen.com/i20816836/6fb2df7c84b2db11.png)
1、开发生产者
@Test
void testfanout(){
/**
* 参数一:交换机名称
* 参数二:路由名称(在fanout模式下,该参数无作用)
* 参数三:消息内容
*
*/
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("fanout-boot-exchange","","fanout boot");
}
}
2、开发消费者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "fanout-boot-exchange",type = "fanout") //绑定交换机
)
})
public void fanoutStr(String message){
System.out.println("消费消息-1:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "fanout-boot-exchange",type = "fanout") //绑定交换机
)
})
public void fanoutStr2(String message){
System.out.println("消费消息-2:" + message);
}
结果:
消费消息-2:fanout boot
消费消息-1:fanout boot
@QueueBinding注解用来绑定队列和交换机
@Exchange注解用来声明交换机
Route模型使用
![](https://img.haomeiwen.com/i20816836/c84bd7405dec892d.png)
1、开发生产者
@Test
void testDirect(){
/**
* 参数一:交换机名称
* 参数二:路由名称
* 参数三:消息内容
*
*/
rabbitTemplate.convertAndSend("direct-boot-exchange","error","direct boot error");
}
2、开发消费者
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class directCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //创建临时队列
exchange = @Exchange(value = "direct-boot-exchange",type = "direct"), //绑定交换机名称和类型,默认类型就为direct
key = {"info"}) //路由key
})
public void directStr(String message){
System.out.println("消费消息-1:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "direct-boot-exchange",type = "direct"),
key={"info","error"})
})
public void directStr2(String message){
System.out.println("消费消息-2:" + message);
}
}
发送路由key为error
消费消息-2:direct boot error
发送路由key为info
消费消息-1:direct boot info
消费消息-2:direct boot info
Topic模型使用
![](https://img.haomeiwen.com/i20816836/6b66838d7aed4f3e.png)
1、开发生产者
@Test
void testTopic(){
/**
* 参数一:交换机名称
* 参数二:路由名称
* 参数三:消息内容
*
*/
rabbitTemplate.convertAndSend("topic-boot-exchange","user.role.query","topic boot user.role.query");
}
2、开发消费者
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "topic-boot-exchange",type = "topic"),
key ={"user.*"})
})
public void topicStr(String message){
System.out.println("消费消息-1:" + message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "topic-boot-exchange",type = "topic"),
key ={"user.#"})
})
public void topicStr2(String message){
System.out.println("消费消息-2:" + message);
}
}
发送路由key为user.role.query
消费消息-2:topic boot user.role.query
发送路由key为user.query
消费消息-2:topic boot user.query
消费消息-1:topic boot user.query
上面将五种消息模型都简单的使用一遍,但我们生产者每次发送的都是Stirng类型的数据,如果我们要发送对象呢?通过查看rabbitTemplate#convertAndSend
的接口定义,我们知道发送的消息可以是Object类型,那么是不是意味着任何对象都可以推送给mq呢?开始尝试一下
加入依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
创建实体
@Data
public class PayDTO {
private Long payId;
private String payBank;
private String money;
private String payTime;
}
开发生产者
@Component
@Slf4j
public class RabbitReceiver {
public void send(PayDTO pay){
log.info("支付发送消息 RabbitSender.send() payDTO = {}", JSON.toJSONString(pay));
//发送hello world模型的方式消息
rabbitTemplate.convertAndSend("hello-boot",pay);
}
}
调用
@GetMapping("/sendPay")
public String sendPay(){
PayDTO payDTO = new PayDTO();
payDTO.setMoney("8888888");
payDTO.setPayBank("***银行");
payDTO.setPayId(123541880889922L);
payDTO.setPayTime("2029-11-18 14:15:48 654");
rabbitSender.send(payDTO);
return "success";
}
调用之后我们可以看到控制台出现报错信息:http://localhost:8080/sendPay
java.lang.IllegalArgumentException: SimpleMessageConverter only supports String,
byte[] and Serializable payloads, received: com.gongj.apppay.dto.PayDTO
![](https://img.haomeiwen.com/i20816836/0e01dfb1e316912b.png)
为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑的,核心代码为
org.springframework.amqp.support.converter.SimpleMessageConverter.createMessage
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}
if (bytes != null) {
messageProperties.setContentLength((long)bytes.length);
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
}
上面逻辑很明确的指出了,只接受byte数组,String字符串,可序列化对象。
可序列化对象
我们这就演示用可序列化对象实现传递对象,对象需要实现Serializable接口。Stirng和byte[]就不演示了。
@Data
public class PayDTO implements Serializable {
private Long payId;
private String payBank;
private String money;
private String payTime;
}
然后再次调用接口,http://localhost:8080/sendPay,可以看到生产者已经发送消息成功了
![](https://img.haomeiwen.com/i20816836/4c1a0bc135596e15.png)
想要看到我上面的那种效果,需要这条消息没有被消费掉。
开发消费者
@Component
@Slf4j
public class RabbitReceiver {
@RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
public void infoConsumption(PayDTO message,@Headers Map<String,Object> headers){
log.info("RabbitReceiver.infoConsumption() data = {}", JSON.toJSONString(message));
for(Map.Entry<String, Object> map :headers.entrySet()){
log.info("headers ===> " + map.getKey() + "=" + map.getValue());
}
}
![](https://img.haomeiwen.com/i20816836/5f43eeb1c71949d4.png)
使用上述这种方式发送的对象必须遵循两个条件
- bean必须实现Serializable 接口
- 生产者消费者的bean包名、类名、属性名必须一致
我们已经知道RabbitTemplate默认是利用SimpleMessageConverter来实现封装Message逻辑,它只接受byte数组,String字符串,可序列化对象。那我们会想有没有其他的MessageConverter来友好的支持任何类型的对象
自定义MessageConverter
自定义一个json序列化方式的MessageConverter来解决上面的问题(FastJson方式)
生产者端配置
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate fastjsonRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new AbstractMessageConverter() {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
messageProperties.setContentType("application/json");
return new Message(JSON.toJSONBytes(object), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return JSON.parse(message.getBody());
}
});
return rabbitTemplate;
}
}
PayDTO就可以不需要实现Serializable 接口了
mq内接收到的推送消息如下
![](https://img.haomeiwen.com/i20816836/3d42a5df690181f1.png)
消费者
@RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
public void infoConsumption(@Payload String data, @Headers Map<String,Object> headers) throws IOException {
log.info("RabbitReceiver.infoConsumption() data = {}", data);
for(Map.Entry<String, Object> map :headers.entrySet()){
log.info("headers ===> " + map.getKey() + "=" + map.getValue());
}
}
![](https://img.haomeiwen.com/i20816836/57f531117920198f.png)
ps:修改消费者的接收方式
@RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
public void infoConsumption(@Payload JSONObject data, @Headers Map<String,Object> headers) throws IOException {
log.info("RabbitReceiver.infoConsumption() data = {}", data.toJSONString());
PayDTO pay = data.toJavaObject(PayDTO.class);
for(Map.Entry<String, Object> map :headers.entrySet()){
log.info("headers ===> " + map.getKey() + "=" + map.getValue());
}
}
在消费时出现转换异常
Caused by: org.springframework.messaging.converter.MessageConversionException
: Cannot convert from [[B] to [com.alibaba.fastjson.JSONObject] for
GenericMessage [payload=byte[101], headers={amqp_receivedDeliveryMode=PERSISTENT,
amqp_receivedRoutingKey=hello-boot, amqp_deliveryTag=1,
amqp_consumerQueue=hello-boot, amqp_redelivered=false,
id=85b64ff1-de50-2544-c6f8-e2ce899837cf,
amqp_consumerTag=amq.ctag-yjs0gj4B7WaHPT43CpwuSQ,
amqp_lastInBatch=false, contentType=application/json, timestamp=1603377654068}]
在消费端增加配置
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.context.annotation.Bean;
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new AbstractMessageConverter() {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
messageProperties.setContentType("application/json");
return new Message(JSON.toJSONBytes(object), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return JSON.parse(message.getBody());
}
});
return factory;
}
![](https://img.haomeiwen.com/i20816836/d6c9b7e0f594129f.png)
Jackson2JsonMessageConverter
上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是Jackson2JsonMessageConverter
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了headers和content_encoding
![](https://img.haomeiwen.com/i20816836/d14deec84186b828.png)
开发消费者
@RabbitListener(queuesToDeclare = {@Queue(value = "hello-boot",durable = "false",autoDelete = "false")})
public void infoConsumption(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
log.info("RabbitReceiver.infoConsumption() data = {}", data.toString());
ObjectMapper objectMapper = new ObjectMapper();
PayDTO payDTO = objectMapper.readValue(data, PayDTO.class);
log.info("getPayBank = {}====",payDTO.getPayBank());
}
调用接口,http://localhost:8080/sendPay
完整代码:https://gitee.com/gongjienianq/rabbitmq
网友评论