1 .pom.xml
<!-- Spring boot 集成rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
2 . 配置
spring:
application:
name: demo-mq
#配置rabbitmq
rabbitmq:
addresses: 152.136.27.48:5672
username: guest
password: guest
#虚拟主机地址
virtual-host: /
#连接超时时间
connection-timeout: 15000
#publisher-confirms: true #开启发送确认
publisher-returns: true #开启发送失败退回
template:
mandatory: true
#消费端配置
listener:
simple:
#消费端
concurrency: 10
#最大消费端数
max-concurrency: 20
#自动签收auto 手动 manual
acknowledge-mode: manual #开启ACK
#限流
prefetch: 50
3 .rabbitmq配置
package com.example.demo.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @program: demo-rabbitmq
* @description
* @author: tina.liu
* @create: 2020-05-16 21:36
**/
@Configuration
public class RabbitConfig {
/**
* 创建广播类型的exchange queue 将二者绑定
*/
//队列名
public static final String FANOUT_QUEUE_NAME = "test_fanout_queue";
public static final String FANOUT_QUEUE_NAME1 = "test_fanout_queue1";
public static final String TEST_FANOUT_EXCHANGE = "testFanoutExchange";
//创建队列
@Bean
public Queue createFanoutQueue() {
return new Queue(FANOUT_QUEUE_NAME);
}
//创建队列
@Bean
public Queue createFanoutQueue1() {
return new Queue(FANOUT_QUEUE_NAME1);
}
//创建广播类型的交换机
@Bean
public FanoutExchange defFanoutExchange() {
return new FanoutExchange(TEST_FANOUT_EXCHANGE);
}
//队列与交换机进行绑定
@Bean
Binding bindingFanout() {
return BindingBuilder.bind(createFanoutQueue()).
to(defFanoutExchange());
}
//队列与交换机进行绑定
@Bean
Binding bindingFanout1() {
return BindingBuilder.bind(createFanoutQueue1()).
to(defFanoutExchange());
}
/**
* 创建直连的queue exchange 绑定
* @return
*/
public static final String DIRECT_QUEUE_NAME = "test_direct_queue"; //queue
public static final String TEST_DIRECT_EXCHANGE = "testDirectExchange"; //exchange
public static final String DIRECT_ROUTINGKEY = "test"; //routingKey
/**
* 创建直连的队列
* @return
*/
@Bean
public Queue createDirectQueue() {
return new Queue(DIRECT_QUEUE_NAME);
}
/**
* 创建直连交换机
* @return
*/
@Bean
DirectExchange directExchange(){
return new DirectExchange(TEST_DIRECT_EXCHANGE);
}
/**
* 将交换机和队列绑定
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(createDirectQueue()).
to(directExchange()).
with(DIRECT_ROUTINGKEY);
}
/**
* 创建通配符类型的exchange queue 将二者绑定
*/
public static final String TOPIC_QUEUE_NAME = "test_topic_queue"; //queue
public static final String TEST_TOPIC_EXCHANGE = "testTopicExchange"; //exchange
public static final String TOPIC_ROUTINGKEY = "test.*";//routingKey
//创建队列
@Bean
public Queue createTopicQueue() {
return new Queue(TOPIC_QUEUE_NAME);
}
/**
* 创建通配符交换机
* @return
*/
@Bean
TopicExchange defTopicExchange(){
return new TopicExchange(TEST_TOPIC_EXCHANGE);
}
/**
* 将交换机和队列进行绑定
* @return
*/
@Bean
Binding bindingTopic() {
return BindingBuilder.bind(createTopicQueue()).
to(defTopicExchange()).
with(TOPIC_ROUTINGKEY);
}
}
4 . 生产者
package com.example.demo.rabbitmq.rabbitmq;
import com.example.demo.rabbitmq.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @program: demo-rabbitmq
* @description 消息生成者
* @author: tina.liu
* @create: 2020-05-16 21:42
**/
@Component
@Slf4j
public class MsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 通过广播类型的交换机生成消息
* @param massage
*/
public void send2FanoutTestQueue(String massage){
//exchange key msg
rabbitTemplate.convertAndSend(RabbitConfig.TEST_FANOUT_EXCHANGE,"", massage);
log.info("广播类型的交换机绑定消息队列,生成者发送消息成功");
}
/**
* 通过直连类型的交换机生成消息
* @param massage
*/
public void send2DirectTestQueue(String massage){
rabbitTemplate.convertAndSend(RabbitConfig.TEST_DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, massage);
log.info("直连类型的交换机绑定消息队列,生成者发送消息成功");
}
/**
* 通过通配符类型的交换机生成消息
* @param massage
*/
public void send2TopicTestAQueue(String massage){
rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE, "a.test.aaa", massage);
log.info("通配符类型A的交换机绑定消息队列,生成者发送消息成功");
}
public void send2TopicTestBQueue(String massage){
rabbitTemplate.convertAndSend(RabbitConfig.TEST_TOPIC_EXCHANGE, "test.bbb", massage);
log.info("通配符类型B的交换机绑定消息队列,生成者发送消息成功");
}
}
5 .消费者
package com.example.demo.rabbitmq.rabbitmq;
import com.example.demo.rabbitmq.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @program: demo-rabbitmq
* @description 消息消费者
* @author: tina.liu
* @create: 2020-05-16 21:42
**/
@Component
@Slf4j
public class MsgConsumer {
/**
* 从跟广播类型的交换机绑定的队列中消费消息
* @param massage
*/
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value = RabbitConfig.FANOUT_QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = RabbitConfig.TEST_FANOUT_EXCHANGE, type = "fanout"))
})
@RabbitHandler
public void processFanoutMsg(Message massage) {
String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
log.info("从广播类型的交换机绑定的队列中消费消息success, message : {}" + msg);
}
/**
* 从跟广播类型的交换机绑定的队列中消费消息
* @param massage
*/
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value = RabbitConfig.FANOUT_QUEUE_NAME1, durable = "true"),
exchange = @Exchange(value = RabbitConfig.TEST_FANOUT_EXCHANGE, type = "fanout"))
})
@RabbitHandler
public void processFanout1Msg(Message massage) {
String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
log.info("从广播类型的交换机绑定的队列中消费消息success, message : {}" , msg);
}
/**
* 通过直连类型的交换机绑定的消息队列中消费消息
* @param massage
*/
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value = RabbitConfig.DIRECT_QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = RabbitConfig.TEST_DIRECT_EXCHANGE),
key = RabbitConfig.DIRECT_ROUTINGKEY)
})
@RabbitHandler
public void processDirectMsg(Message massage) {
String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
log.info("从直连类型的交换机绑定的消息队列中消费消息success, message : {}" , msg);
}
/**
* 通过通配符类型的交换机绑定的消息队列中消费消息
* @param massage
*/
@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value = RabbitConfig.TOPIC_QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = RabbitConfig.TEST_TOPIC_EXCHANGE, type = "topic"),
key = RabbitConfig.TOPIC_ROUTINGKEY)
})
@RabbitHandler
public void processTopicMsg(Message massage) {
String msg = new String(massage.getBody(), StandardCharsets.UTF_8);
log.info("从通配符类型的交换机绑定的消息队列中消费消息success, message : {}" + msg);
}
}
6 .controller
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.config.RabbitConfig;
import com.example.demo.rabbitmq.rabbitmq.MsgProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @program: demo-rabbitmq
* @description
* @author: tina.liu
* @create: 2020-05-16 21:49
**/
@RestController
public class TestController {
//注入消息生成者
@Autowired
private MsgProducer msgProducer;
/**
* 测试广播类型的交换器 消息队列
*/
@GetMapping("/test/fanout")
public String Test1(){
msgProducer.send2FanoutTestQueue("借助广播类型的交换机 队列传递信息");
return "利用广播类型的交换机绑定队列发送消息";
}
/**
* 测试直连类型的交换机 消息队列
*/
@GetMapping("/test/direct")
public String Test2(){
msgProducer.send2DirectTestQueue("借助直连类型的交换机 队列传递信息");
return "利用直连类型的交换机绑定队列发送消息";
}
/**
* 测试通配符交换机 消息队列
*/
@GetMapping(value = "/test/topic/A")
private String test3(){
msgProducer.send2TopicTestAQueue("借助通配符类型的交换机A,队列传递信息");
return "利用通配符类型的交换机绑定队列发送消息A";
}
@GetMapping(value = "/test/topic/B")
private String test4(){
msgProducer.send2TopicTestBQueue("借助通配符类型的交换机B,队列传递信息");
return "利用通配符类型的交换机绑定队列发送消息B";
}
}
网友评论