创建Queue队列(QUEUE_ORDER)
创建Order队列 创建Order队列后创建Exchange(EXCHANGE_ORDER)
创建order交换机 创建order交换机之后交互性类型介绍
交互性类型
- direct: 词典类型
- fanout:类似广播方式
- headers:
- topic:表示定义类型(主题)
队列详情介绍
image.png手动绑定(只有绑定后才可以发送消息)
可以在exchange交换机界面绑定 也可以在queue队列界面绑定
order.wxid_on8oksh88zo22
模糊匹配单个词(可以支持一个点)
order.
模拟匹配多个词。可以支持多个点如(order.wxid_on8oksh88zo22.abc123)
order.#
手动绑定绑定后可以看绑定关系-路由规则
绑定后
queue中看到的绑定关系
queue中看到的绑定关系消息的生产者MqMsgProducer
/**
* 消息的生产者
*/
@Component
public class MqMsgProducer implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
private RabbitTemplate rabbitTemplate;
/**
* 构造方法注入rabbitTemplate
*/
@Autowired
public MqMsgProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
}
public void sendMsg(String content) {
//消息ID
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
logger.info("rabbitMq 生产者消息 内容content = {} CorrelationData= {}", content,correlationId);
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_TASK, RabbitMqConfig.ROUTINGKEY_TASK, content, correlationId);
}
/**
* 根据消息对象发送消息
* @param order
*/
public void sendOrder(Order order) {
//消息ID
CorrelationData correlationId = new CorrelationData(order.getId()); //
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
logger.info("rabbitMq 生产者消息 内容Order = {} CorrelationData= {}", order,correlationId);
rabbitTemplate.convertAndSend(
RabbitMqConfig.EXCHANGE_ORDER
, RabbitMqConfig.ROUTINGKEY_ORDER //路由key
, order //消息体内容 --rabbbitmq会自动帮我们做系列化
, correlationId);
}
/**
* 回调--应该是发送确认--callback是异步的--确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
* RabbitMQ(四)消息确认(发送确认,接收确认)
* https://blog.csdn.net/qq315737546/article/details/54176560
* RabbitMQ:消息发送确认 与 消息接收确认(ACK)
* https://www.jianshu.com/p/2c5eebfd0e95
* @param correlationData 消息唯一标志
* @param ack 如果消息没有到exchange,则confirm回调,ack=false 如果消息到达exchange,则confirm回调,ack=true
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回调id:" + correlationData +"cause = {}",cause);
if (ack) {
logger.info("收到回调,成功发送到broker");
} else {
logger.info("收到回调,失败发送到broker:" + cause);
}
}
}
运行单元测试sendOrderTest
@Test
public void sendOrderTest() throws InterruptedException {
Order order = new Order();
order.setId("20190706_000001");
order.setName("牵手订单00001");
order.setMessgeId(System.currentTimeMillis()+"$"+ UUID.randomUUID().toString()); //保障是唯一的
mqMsgProducer.sendOrder(order);
}
image.png
通过控制台查看
看到消息,点击进去看看
看到消息
get message 获取消息--是base64位编码
image.png
把base64的消息内容通过解密
image.png删除绑定
image.png删除队列&清空队列
image.png创建消费者MqMsgReceiverOrder
@RabbitListener注解可以自动帮我们创建队列、路由、
@Component
public class MqMsgReceiverOrder {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
*
* @param order 其中@Payload表示消息体的类似是Order
* @param headers
* @param channel 消费者手动签收时,依赖消息通道Channel
* @throws Exception
*/
//@RabbitListener注解可以自动帮我们创建队列、路由、
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMqConfig.QUEUE_TASK,durable = "true") //第二2参数是是否持久化
,exchange = @Exchange(name = RabbitMqConfig.EXCHANGE_ORDER,durable = "true",type = "topic")
,key = RabbitMqConfig.ROUTINGKEY_ORDER
)
)
@RabbitHandler
public void onOrderMessage(@Payload Order order,
@Headers Map<String,Object> headers,
Channel channel) throws Exception{
logger.info("收到消费消息,模拟业务操作。订单id={}",order.getId());
headers.get(AmqpHeaders.DELIVERY_TAG);
//处理手动必须
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);//第2个参数是否支持批量接收
channel.basicAck(deliveryTag,false); //第二2参数时手动签收
}
}
image.png
网友评论