原理见:https://www.jianshu.com/p/79ca08116d57
安装RabbitMQ略过
1.Springboot引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.properties配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
###消费者额外配置
spring.rabbitmq.listener.concurrency=2 //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数
3.RabbitMQ配置类
@Configuration
public class RabbitConfig {
/**
* 声明队列
* @return
*/
@Bean
public Queue queue() {
return new Queue("hello.queue1");
}
/**
* 声明交互器
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
/**
* 绑定
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(topicExchange()).with("key");
}
}
4.消息生产者
@Component
@Slf4j
public class Producer implements RabbitTemplate.ConfirmCallback, ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void handle(Return aReturn) {
log.info("消息{},发送失败",aReturn.getBody());
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if(ack){
log.info("消息发送成功:{}",correlationData);
}else {
log.info("消息发送失败:{}",s);
}
}
public void send(String message){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
log.info("发送消息:{}",message);
String resp=rabbitTemplate.convertSendAndReceive("topicExchange","key",message,correlationId).toString();
log.info("消费者响应:{},处理完成",resp);
}
要点:
1.注入RabbitTemplate
2.实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(非必须)。
ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
3.实现消息发送方法。调用rabbitTemplate相应的方法即可。rabbitTemplate常用发送方法有:
rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。
5.消息消费者
@Componentpublic class Customer {
@RabbitListener(queues = "hello.queue1")
public String processMessage1(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg); return msg.toUpperCase(); }
}
要点:
1.监听器参数类型与消息实际类型匹配。在生产者中发送的消息实际类型是String,所以这里监听器参数类型也是String。
2.如果监听器需要有响应返回给生产者,直接在监听方法中return即可。
3.queues必须与生产者中配置的队列名一样。
6.测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqApplicationTests {
@Autowired
Producer producer;
@Test
public void contextLoads() throws InterruptedException {
for (int i = 0; i < 100; i++) {
producer.send("发送第"+i+"条消息");
Thread.sleep(2000);
}
}
}
输出结果如下:
255BBCFB-5950-4124-BF1D-D7DDDF9EDC70.png
网友评论