交换机属性
- Name:交换机名称
- Type:交换机类型direct、topic、 fanout、 headers
- Durability:是否需要持久化,true为持久化
- Auto Delete:当最后一个绑定到Exchange.上的队列删除后,自动删除该Exchange
- Internal:当前Exchange是否用于RabbitMQ内部使用, 默认为False
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用
Direct Exchange
简介
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
注意: Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传 递时,RouteKey必须完全匹配才会被队列接收,消费者才能读取到消息,否则该消息会被抛弃
结构图
image.png示例
package com.dodou.liwh.amqp.boot.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
DirectExchange directExchange() {
DirectExchange direct_ex = new DirectExchange("direct.ex");
return direct_ex;
}
@Bean
Queue directQueue() {
Queue direct_que = new Queue("direct.que");
return direct_que;
}
/*队列绑定交换机:Boot自动绑定
1)每一个队列的名字不能相同
2)并且转入的参数名要匹配*/
@Bean
Binding binding(DirectExchange directExchange,Queue directQueue) {
Binding binding = BindingBuilder.bind(queue).to(directExchange).with("direct.rout.key");
return binding;
}
}
package com.dodou.liwh.amqp.boot.direct;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class DirectSender {
@Autowired
private AmqpTemplate amqpTemplate;
private String RoutingKey = "direct.rout.key";
private String EX_NAME = "direct.ex";
private String msg = "交换机类型是:direct,用以支持RoutingKey";
//发送消息到Exchange
public void Send() {
amqpTemplate.convertAndSend(EX_NAME, RoutingKey, msg);
System.out.println(msg);
}
}
package com.dodou.liwh.amqp.boot.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "direct.que")
public class DirectReceiver {
@RabbitHandler
public void consume(String msg) {
System.out.println("Receiver消费:队列是:direct" + ":消息:" + msg);
}
}
Topic Exchange
简介
所有发送到Topic Exchange的消息被转发到,模糊匹配RouteKey的Queue
如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
匹配规则
- “ # ” 表示0个或若干个关键字。
- “ * ” 表示一个关键字。
-
“ . “ 用于分割关键字,如topic.warn.timeout
image.png
示例
package com.dodou.liwh.amqp.boot.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange() {
// 参数1 name :交换机名称
// 参数2 durable :是否持久化
// 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
TopicExchange exchange = new TopicExchange("topic.ex", false, false);
return exchange;
}
@Bean
public Queue topicQueue() {
// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
Queue queue = new Queue("topic.que",false,false,false);
return queue;
}
@Bean
public Binding binding(TopicExchange topicExchange, Queue topicQueue) {
//绑定消费规则
Binding binding = BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");
return binding;
}
}
package com.quanwugou.mall.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate amqpTemplate;
private String TOPIC_EX = "topic.ex";
//生产者发送RoutingKey
private String RoutingKey = "topic.hello.fast";
private String msg = "交换机类型是:topic,模糊匹配RoutingKey";
public void send() throws Exception {
Hehe hehe = new Hehe();
hehe.setI(1);
Hei hei = new Hei();
hei.setNum(BigDecimal.ONE);
hehe.setO(hei);
//SimpleMessageConverter only supports String, byte[] and Serializable payloads : 实体类有没有序列化? 默认的转换器是SimpleMessageConverter,它适用于String、Serializable实例和字节数组。
//仅推荐JSONString、Serializable实例
amqpTemplate.convertAndSend(TOPIC_EX, RoutingKey, hehe);
System.out.println(msg);
}
}
package com.quanwugou.mall.mq;
import cn.hutool.core.util.ObjectUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class TopicReceiver {
/*msg : 消费的消息
*channel : 当前操作通道
*@Header : 可以获取到所有的头部信息
* */
@RabbitListener(queues = "topic.que")
@RabbitHandler
public void rec(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
channel.basicQos(1);
Hehe hehe = (Hehe) ObjectUtil.deserialize(msg.getBody());
System.out.println("receiver: " + hehe.toString());
channel.basicAck(tag, false);
} catch (Exception e) {
// TODO 消费失败,那么我们可以进行容错处理,比如转移当前消息进入其它队列
// channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息
// 而basicReject一次只能拒绝一条消息
// tag:消息标识
// false:是否批量.true:将一次性拒绝所有小于deliveryTag的消息
// true:重新排队
channel.basicNack(tag, false, true);//重排并不是放在最后
}
}
}
Fanout Exchange
简介
- 不关心路由键,只需要将队列绑定到交换机上即可,因此转发消息是最快的
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
结构图
image.png示例
放在2-1 RabbitTemplate实现
网友评论