示例代码地址:
https://github.com/sushizhendeqiang/springboot-rabbitmq-demo
一、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置文件添加配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 10000
listener:
simple:
# 手动应答
acknowledge-mode: manual
auto-startup: true
# 不重回队列
default-requeue-rejected: false
concurrency: 5
max-concurrency: 20
# 每次只处理一个信息
prefetch: 1
retry:
enabled: false
三、生产者、消费者
生产者
package com.rabbitmq.demo.demo;
import com.rabbitmq.demo.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @Author: sush4
* @Description:
* @Date: 2020/7/18
*/
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 发送消息
*/
@SendTo
public void sendMessage() {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发送简单消息
IntStream.rangeClosed(1, 5).forEach(num -> {
String body = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " : " + num;
MessageProperties properties = new MessageProperties();
//消息内容的编码格式
properties.setContentEncoding("UTF-8");
//Delivery mode: 是否持久化,1 - Non-persistent,2 - Persistent
properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
Message message = new Message(body.getBytes(Charset.forName(properties.getContentEncoding())), properties);
amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbitmq-demo-routingkey", message);
});
// 发送java bean 消息
// 实体要序列化 否则 会发送失败
IntStream.rangeClosed(1, 5).forEach(num -> {
//这里的builder是lombok快速构建实体的一个方法
User user = User.builder().userId(num).username("zhangsan:" + num).password("666666").build();
//convertAndSend方法参数说明:
//参数1:exchange 交换机名称
//参数2:routingKey 绑定关系,通过绑定关系,将exchage交换机绑定到queue队列
amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbitmq-demo-routingkey-bean", user);
});
}).start();
}
}
生产者其实没什么好说的,一个convertAndSend完事~
要注意的是:如果想直接发送的实体的话,实体要序列化(Serializable)
消费者
package com.rabbitmq.demo.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.demo.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* @Author: sush4
* @Description:
* @Date: 2020/7/18
*/
@Component
@Slf4j
public class RabbitReceiver {
@RabbitHandler
@RabbitListener(queues = "rabbitmq-demo")
public void receiveMessage(Message message, Channel channel) throws UnsupportedEncodingException {
String encoding = message.getMessageProperties().getContentEncoding();
log.info("接收到string消息:[{}]", new String(message.getBody(), "UTF-8"));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
@RabbitHandler
@RabbitListener(queues = "rabbitmq-demo-bean")
public void receiveMessage(User user, Message message, Channel channel) {
log.info("接收到bean消息:[{}]", user);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
消费者关键是两个注释:
@RabbitListener:监听队列中的消息
@RabbitHandler:和@RabbitListener配合使用,@RabbitListener监听到消息,交由@RabbitHandler标注的方法处理
这里我贴出来的示例可能不能明确感受到这两个注解的作用,可以看下图:
具体运行哪个方法,要看message的参数类型
图片来源:https://www.jianshu.com/p/911d987b5f11
另外,关于@RabbitListener的使用,我这里只指明了queue队列,如果在rabbitmq中不对该队列进行创建配置的话,是不会监听到信息的(队列都没有,监听了毛线);
这里还有另外一种写法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "rabbitmq-demo", durable = "true"),
exchange = @Exchange(name = "rabbit-springboot-exchange", durable = "true", type = "topic"),
key = "rabbitmq-demo-routingkey"
))
这么写的话,不存在的情况下,会自动创建~
Queue、Exchange、routingKey就不用赘述了,分别是队列、交换机、绑定关系;
durable这个字段表明是否持久化。
这篇文章只介绍了rabbitmq的简单使用~
有理解不正确的地方,望指正!
网友评论