1. 主要类型
交换器主要有4种类型:fanout、direct、topic、headers
下面我们挨个来看每一个类型
2. fanout
把所有发送该路由的消息,都路由到所有绑定的队列中。
下面我们模拟了3个队列
public class FanoutTest {
//定义一些常量
private static final String EXCHANGE_NAME = "exchange_basic_fanout"; //这里是发送消息的路由
private static final String ROUTING_KEY_1 = "routingkey_basic_1"; //分别设置了3个不同的routingkey
private static final String ROUTING_KEY_2 = "routingkey_basic_2";
private static final String ROUTING_KEY_3 = "routingkey_basic_3";
private static final String QUEUE_NAME_1 = "queue_basic_1";//这里我们将创建3个不同的队列
private static final String QUEUE_NAME_2 = "queue_basic_2";
private static final String QUEUE_NAME_3 = "queue_basic_3"
private static final int PORT = 5672;
//发送消息
public static void send() throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置工厂相关属性
factory.setHost("localhost");
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root");
// 创建连接
Connection conn = factory.newConnection();
//创建3个不同的队列,使用不同的routing_key来绑定,
createChannel(conn,QUEUE_NAME_1,ROUTING_KEY_1);
createChannel(conn,QUEUE_NAME_2,ROUTING_KEY_2);
createChannel(conn,QUEUE_NAME_3,ROUTING_KEY_3);
//连接关闭
conn.close();
}
//通过参数创建channel
public static void createChannel(Connection conn, String queueName,String routingKey) throws Exception {
// 创建信道,一个连接上可以创建多个通过
Channel channel1 = conn.createChannel();
// 创建一个fanout类型,持久化、非自动删除的交换器
channel1.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
// 创建一个持久化、非排他的、非自动删除的队列
channel1.queueDeclare(queueName, true, false, false, null);
// 将交换器与队列通过路由器绑定
channel1.queueBind(queueName, EXCHANGE_NAME, routingKey);
// 发送一条消息
String message2 = "你好!";
channel1.basicPublish(EXCHANGE_NAME, routingKey+"_4", MessageProperties.PERSISTENT_TEXT_PLAIN,
message2.getBytes());
// 通道关闭
channel1.close();
}
//通过队列名,接收消息
public static void receive(String queueName) throws IOException, TimeoutException, InterruptedException {
//接收的方式稍有不同,这里我们可以指定多个地址,每个地址主要包括IP,和端口号
Address[] addresses = new Address[] { new Address("localhost", PORT) };
ConnectionFactory factory = new ConnectionFactory();
//这里只要指定连接的用户名和密码即可
factory.setUsername("root");
factory.setPassword("root");
Connection conn = factory.newConnection(addresses);
//这里创建一个接收消息的通道
final Channel channel = conn.createChannel();
//这里我们实现了一个匿名的接口
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//这里我们打印一下接收消息的内容
System.out.println("recv,收到消息:" + new String(body) + ",consumerTag:" + consumerTag);
//对通道应答,表示已成功接收
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//启动消息接收过程
channel.basicConsume(queueName, consumer);
TimeUnit.SECONDS.sleep(5);
channel.close(); //通道关闭
conn.close(); //连接关闭
}
public static void main(String[] args) throws Exception {
//创建了3个队列,并分别发送了消息
send();
//3个队列,分别接收消息
receive(QUEUE_NAME_1);
receive(QUEUE_NAME_2);
receive(QUEUE_NAME_3);
}
}
下面是我们最终得到的输出结果
//recv,收到消息:你好!,consumerTag:amq.ctag-e3sO2vQPearf8fD5TKKXgQ
//recv,收到消息:你好!,consumerTag:amq.ctag-e3sO2vQPearf8fD5TKKXgQ
//recv,收到消息:你好!,consumerTag:amq.ctag-e3sO2vQPearf8fD5TKKXgQ
//recv,收到消息:你好!,consumerTag:amq.ctag-PSzOOVxdI0jMFJZxdR6LHA
//recv,收到消息:你好!,consumerTag:amq.ctag-PSzOOVxdI0jMFJZxdR6LHA
//recv,收到消息:你好!,consumerTag:amq.ctag-PSzOOVxdI0jMFJZxdR6LHA
//recv,收到消息:你好!,consumerTag:amq.ctag-R3fJrsvLuq6SAIF0u3XB3w
//recv,收到消息:你好!,consumerTag:amq.ctag-R3fJrsvLuq6SAIF0u3XB3w
//recv,收到消息:你好!,consumerTag:amq.ctag-R3fJrsvLuq6SAIF0u3XB3w
通过结果我们可以看出,每个队列都收到了3个消息,也就是完全忽略了routing_key信息,
使用时,我们要格外关注一下交换器的类型,不能只关注它的routing_key信息,不然可能会有重复接收消息的情况发生
未完待...
网友评论