环境
需要提前安装好rabbitmq,这里就不展示了,如下,保证rabbitmq可访问
image.png
一、导入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、application.properties添加配置
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=122.*,*.62
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#发布确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
#采用手动应答
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#是否支持重发
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.virtual-host=/
三、编写SenderConf配置
package com.example.rabbitmq.controller;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class SenderConf{
@Resource
private RabbitTemplate rabbitTemplate2;
@Bean
public Queue queue(){
return new Queue("queue-test");
}
}
四、编写生产者
package com.example.rabbitmq.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.UUID;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping(value="/hello",produces = "text/plain;charset=UTF-8")
public String index(String message){
String context = "hello " + new Date();
System.out.println("Sender : " + context);
//这里是消息确认
this.rabbitTemplate.convertAndSend("queue-test",message);
return "index111";
}
}
五、编写消费者
package com.example.rabbitmq.controller;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import java.io.IOException;
import java.util.Date;
@Component
public class ReceiveController {
@Autowired
private AmqpTemplate rabbitTemplate;
//这里是消息确认
@RabbitListener(queues = "queue-test")
public void process(Message message, Channel channel) throws IOException {
//采用手动应答模式,手动确认应答更为安全稳定
try{
//消息确认
//false只确认当前一个消息收到,true确认所有consumer获得消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("receive:"+new String(message.getBody()));
}catch (Exception e){
e.printStackTrace();
if (message.getMessageProperties().getRedelivered()) {
System.out.println("异常--消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
} else {
System.out.println("异常--消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
}
}
}
}
六、测试
访问
http://127.0.0.1:8080/hello?message=order_id_25,效果如下,即可监听到rabbitmq返回的message
image.png
网友评论