注意点
- 消息的唯一id
- 生产者发送消息之前需要先创建路由key,否则消息无法发送
- 消费者手动签收时,依赖消息通道Channel
- 通常情况下生产者与消费者是在不同的web app
- 消费者的@RabbitListener 会自动帮助我们创建队列、路由key及绑定关系
rabbitmq常用地址
#管理后台
http://192.168.0.19:15672
最佳测试方式是启动2个服务
如8081端口发布消息(测试用例)
8080接收消息
image.png
pom.xml导入依赖
spring-boot-starter-test是为了后面写测试类用,
spring-boot-starter-amqp才是真正的使用rabbitmq的依赖
<!--添加rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
<!--添加rabbitmq 测试用例用 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--单元测试系列:Mock工具之Mockito实战 https://www.cnblogs.com/zishi/p/6780719.html -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.7.19</version>
<scope>test</scope>
</dependency>
application.properties配置文件当中引入RabbitMQ基本的配置信息
############################################################
#
# RabbitMQ 配置
#
############################################################
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#虚拟主机如virtual-task
spring.rabbitmq.virtual-host=/
#链接超时时间15秒
spring.rabbitmq.connection-timeout=15000s
配置jackson部分--非必须的
############################################################
#
#jackson 部分
#如data部分的时间格式化
# 不允许传空
############################################################
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
创建RabbitConfig
使用@Value注解获取application.properties配置信息
/**
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Configuration
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}") //在application.properties中配置
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
public static final String EXCHANGE_TASK = "my-mq-exchange_task";
public static final String EXCHANGE_B = "my-mq-exchange_B";
public static final String EXCHANGE_C = "my-mq-exchange_C";
public static final String QUEUE_A = "QUEUE_A";
public static final String QUEUE_B = "QUEUE_B";
public static final String QUEUE_C = "QUEUE_C";
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE_TASK);
}
/**
* 获取队列A
* @return
*/
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //队列持久
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
}
}
创建消息的生产者MsgProducer
/**
* 消息的生产者
*/
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
private RabbitTemplate rabbitTemplate;
/**
* 构造方法注入rabbitTemplate
*/
@Autowired
public MsgProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
}
public void sendMsg(String content) {
//消息ID
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
logger.info("rabbitMq 生产者消息 内容content = {} CorrelationData= {}", content,correlationId);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TASK, RabbitConfig.ROUTINGKEY_A, content, correlationId);
}
/**
* 回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回调id:" + correlationData);
if (ack) {
logger.info("消息成功消费");
} else {
logger.info("消息消费失败:" + cause);
}
}
}
创建测试类RabbitMQTest
@SpringBootTest(classes= WechatTaskApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class RabbitMQTest {
@Autowired
private MsgProducer msgProducer;
@Test
public void testRabbit_Producer() {
String mq = "{\n" +
" \"task_id\": 1462,\n" +
" \"wechat_id\": \"wxid_on8oksh88zo22\"\n" +
"}";
msgProducer.sendMsg(mq);
}
}
运行testRabbit_Producer测试方法
运行测试类RabbitMQTestd的testRabbit_Producer
image.png
rabbitmq--消费端配置
############################################################
#
# Springboot RabbitMQ 基本配置 192.168.0.19
#192.168.2.34 ubuntu的ip被自动变为192.168.0.19
############################################################
spring.rabbitmq.host=192.168.0.19
spring.rabbitmq.port=5672
spring.rabbitmq.username=czg-admin
spring.rabbitmq.password=czg_pass
#虚拟主机如virtual-task
spring.rabbitmq.virtual-host=/
#链接超时时间15秒
spring.rabbitmq.connection-timeout=15000s
############################################################
#
# Springboot RabbitMQ 消费者配置(依赖RabbitMQ的基本配置)
#
############################################################
#最大并发数
spring.rabbitmq.listener.simple.concurrency=5
#最大并发支持
spring.rabbitmq.listener.simple.max-concurrency=10
#签收模式--可以手工或自动
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流(一条条过来,每一个线程即并发数一次消费一条)---在大流量时控制一次消费消息个数
spring.rabbitmq.listener.simple.prefetch=1
############################################################
添加消息的消费者MsgReceiver
**
* 消费方
* 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual
*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/* //方式1=不处理消费确认
@RabbitHandler
public void process(String content) {
logger.info("接收处理队列A当中的消息: " + content);
}
*/
/**
* 方式2
* Channel
* @param message
* @param channel com.rabbitmq.client.Channel https://blog.csdn.net/asdfsadfasdfsa/article/details/79671097
* @param tag 好像都是1,2之类 deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
*
* RabbitMQ(4)SpringBoot+RabbitMQ发送确认和消费手动确认机制
* https://www.jianshu.com/p/fae8fca98522
*/
@RabbitHandler
public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
logger.info("接收处理队列A当中的message = {},消息tag ={}" + message,tag);
try {
// 模拟执行任务
Thread.sleep(1000);
channel.basicAck(tag,false); // 确认收到消息(需要在application.properties消费消息时指定非自动确认),消息将被队列移除,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
再运行刚才的测试方法
image.png关于rabbitmq的文章收录
rabbitmq消息队列的简单入门-csdn
Spring Boot整合RabbitMQ详细教程-csdn
springboot学习笔记-6 springboot整合RabbitMQ
RabbitMQ:消息发送确认 与 消息接收确认(ACK)
RabbitMQ(4)SpringBoot+RabbitMQ发送确认和消费手动确认机制
RabbitMQ消息中间件极速入门与实战--慕课
网友评论