Exchange:接收消息,并根据路由键转发消息所绑定的队列!
![](https://img.haomeiwen.com/i4084822/a8f65bcc004b31dc.png)
Name: 交换机名称
Type: 交换机类型 direct、topic、fanout、headers
Durability:是否需要持久化,true为持久化
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
Arguments:扩展参数,用于扩展AMQP协议自制定化使用
交换机类型说明:
Direct Exchange
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则改消息会被抛弃。
Topic Exchange
所有发送到Topic Exchange 的消息被转发到所有关心RouteKey中指定Topic的Queue上
Exchange将RouteKey和某Topic进行模糊匹配, 那么此时队列需要绑定一个Topic
注意: 可以使用通配符
符号 “#” 匹配一个或者多个词
符号 “*” 匹配不多不少一个词
例如:“log.#” 能匹配到 “log.info.oa”
"log.*" 只会匹配到 “log.erro”
Fanout Exchange
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
代码
交换机类型 Direct Exchange
生产者
package com.example.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*
* 交换机类型:Direct Exchange
*
* 先启动 消费者
*
*
* 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
* 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,
* 则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
*
* weiximei on 2019-02-23
*/
public class ProducerDirectServerExchange {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.1.118");
// 端口号
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
// 设置账户
connectionFactory.setUsername("weiximei");
// 设置密码
connectionFactory.setPassword("weiximei");
// 2.创建 一个连接 connection
Connection connection = connectionFactory.newConnection();
// 3.创建一个管道 channel
Channel channel = connection.createChannel();
// 声明 投放到哪一个交换机, 哪一个队列
String exchangeName = "test_direct_exchange";
String routingKey = "test_direct";
// 发送
String msg = "我往 " + exchangeName + "交换机," + routingKey + "队列,发送一个一条消息!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
消费者
交换机类型:Direct Exchange
package com.example.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* 消费者
*
* 交换机类型:Direct Exchange
*
* 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
* 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,
* 则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
*
* weiximei on 2019-02-24
*/
public class ConsumerDirectServerExchange {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.1.118");
// 端口号
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
// 设置账户
connectionFactory.setUsername("weiximei");
// 设置密码
connectionFactory.setPassword("weiximei");
// 是否支持重连,true表示支持
connectionFactory.setAutomaticRecoveryEnabled(true);
// 每 3000 毫秒,进行一次重连
connectionFactory.setNetworkRecoveryInterval(3000);
// 2.创建一个连接
Connection connection = connectionFactory.newConnection();
// 3.创建一个 channel 管道
Channel channel = connection.createChannel();
// 4.声明
// 交换机名称
String exchangeName = "test_direct_exchange";
// 交换机类型
String exchangeType = "direct";
// 队列名称
String queueName = "test_direct_queue";
// 路由 key
String routingKey = "test_direct";
// 创建一个交换机 类型为 direct
/**
* exchange: 交换机名称
* type: 消息类型(topic,direct等等)
* durable: 是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息.
* autoDelete: 是否自动删除,设置为TRUE则表是自动删除,自动删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为false
* internal: 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
* arguments: 其它一些结构化参数比如:alternate-exchange
*/
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 监听队列,开始消费
// declare 表示是否持久化消息类型
// exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
// 表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
// 场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
// autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
// arguments: 其它一些结构化参数比如:alternate-exchange
channel.queueDeclare(queueName, false, false, false, null);
// 将交换机和队列绑定
channel.queueBind(queueName, exchangeName, routingKey);
// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称,是否自动ACK,Consumer
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String string = new String(delivery.getBody());
System.out.println("收到消息:" + string);
}
}
}
生产者
交换机类型 Fanout Exchange
package com.example.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* 消息生产者
*
* exchange 类型:fanout
*
*
* 不处理路由键。你只需要简单的将队列绑定到交换机上。
* 一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
* 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
*
*
* @author weiximei on 2019-03-13
*/
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 地址
connectionFactory.setHost("192.168.1.118");
// 端口号
connectionFactory.setPort(5672);
// 虚拟主机路径
connectionFactory.setVirtualHost("/");
// 是否开启断线重连
connectionFactory.setAutomaticRecoveryEnabled(true);
// 每隔 3000 毫秒进行一个断线重连
connectionFactory.setNetworkRecoveryInterval(3000);
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.创建管道
Channel channel = connection.createChannel();
// 4.声明
String exchangeName = "test_fanout_exchange";
// 5.发送消息
for (int i = 0; i < 10; i++ ) {
String msg = "Hello World!";
// 参数:交换机名称,路由key(队列名), 一些属性,消息的数据
// 因为使用的 exchange 是 fanout 所以,路由key(队列名) 可以为空字符串
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者
交换机类型 Fanout Exchange
package com.example.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
*
* 消息消费者
*
* exchange 类型:fanout
*
*
* 不处理路由键。你只需要简单的将队列绑定到交换机上。
* 一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
* 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
*
*
* @author weiximei on 2019-03-13
*/
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置 主机地址
connectionFactory.setHost("192.168.1.118");
// 端口号
connectionFactory.setPort(5672);
// 虚拟机路径
connectionFactory.setVirtualHost("/");
// 断线是否重连
connectionFactory.setAutomaticRecoveryEnabled(true);
// 重连的时间,每隔 3000 毫秒
connectionFactory.setNetworkRecoveryInterval(3000);
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.创建一个管道 channel
Channel channel = connection.createChannel();
// 4.声明
// 交换机名称
String exchangeName = "test_fanout_exchange";
// 交换机类型
String exchangeType = "fanout";
// 队列名称
String queueName = "test_fanout_queue";
// 路由key, 因为exchange的类型是fanout,所以可以不用设置routingKey
String routingKey = "";
/**
* exchange: 交换机名称
* type: 消息类型(topic,direct等等)
* durable: 是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息.
* autoDelete: 是否自动删除,设置为TRUE则表是自动删除,自动删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为false
* internal: 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
* arguments: 其它一些结构化参数比如:alternate-exchange
*/
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
/**
* declare 表示是否持久化消息类型
* exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
* 表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
* 场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
* autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
* arguments 其他的一些参数
*/
channel.queueDeclare(queueName, false, false, false, null);
/**
* 绑定队列
*/
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 队列名称, 是否自动ACK, 回调方法
*/
channel.basicConsume(queueName, true, queueingConsumer);
// 循环获取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + msg);
}
}
}
生产者
交换机类型为 Topic Exchange
package com.example.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.nio.charset.Charset;
/**
*
* 消费者
*
* Exchange 类型为 topic
*
*
* 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
* 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
* 因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
*
*
* @author weiximei on 2019-03-11
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
// 1.创建 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.1.118");
// 端口号
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
// 设置账户
connectionFactory.setUsername("weiximei");
// 设置密码
connectionFactory.setPassword("weiximei");
// 是否断线重连
connectionFactory.setAutomaticRecoveryEnabled(true);
// 没隔 3000 毫秒进行一次重连
connectionFactory.setNetworkRecoveryInterval(3000);
// 2. 创建一个Connection
Connection connection = connectionFactory.newConnection();
// 3.创建一个管道
Channel channel = connection.createChannel();
// 4.声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
// 声明交换机
/**
* exchange: 交换机名称
* type: 消息类型(topic,direct等等)
* durable: 是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息.
* autoDelete: 是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为false
* internal: 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
* arguments: 其它一些结构化参数比如:alternate-exchange
*/
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明队列
/**
* declare 表示是否持久化消息类型
* exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
* 表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
* 场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
* autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
* arguments 其他的一些参数
*/
channel.queueDeclare(queueName, false, false, false, null);
/**
* 绑定队列
*/
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称,是否自动ACK,consumer
channel.basicConsume(queueName, true, consumer);
while (true) {
// 参数消息,如果没有消息,这一步将会一直堵塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息:" + msg);
}
}
}
消费者
交换机类型:Topic Exchange
package com.example.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.nio.charset.Charset;
/**
*
* 消费者
*
* Exchange 类型为 topic
*
*
* 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
* 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
* 因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
*
*
* @author weiximei on 2019-03-11
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
// 1.创建 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.1.118");
// 端口号
connectionFactory.setPort(5672);
// 指定虚拟主机路径, "/" 是rabbitMQ默认的虚拟主机路径
connectionFactory.setVirtualHost("/");
// 设置账户
connectionFactory.setUsername("weiximei");
// 设置密码
connectionFactory.setPassword("weiximei");
// 是否断线重连
connectionFactory.setAutomaticRecoveryEnabled(true);
// 没隔 3000 毫秒进行一次重连
connectionFactory.setNetworkRecoveryInterval(3000);
// 2. 创建一个Connection
Connection connection = connectionFactory.newConnection();
// 3.创建一个管道
Channel channel = connection.createChannel();
// 4.声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
// 声明交换机
/**
* exchange: 交换机名称
* type: 消息类型(topic,direct等等)
* durable: 是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息.
* autoDelete: 是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为false
* internal: 是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
* arguments: 其它一些结构化参数比如:alternate-exchange
*/
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明队列
/**
* declare 表示是否持久化消息类型
* exclusive 是否为当前连接的专用队列,在连接断开后(close),会自动删除该队列,生产环境中应该很少用到吧。
* 表示是否独占,true也就是说我这个消息只能是这一个channel进行消费
* 场景:比如这个10条消息,让一个人消费,要顺序执行,如果多个人消息不能保证消息是顺序执行的
* autoDelete 表示是否自动删除,true当没有任何消费者使用时(也就是这个队列没有一个消费者后, 也可以说是这个队列的最后一个消费者close后,断开了连接),自动删除该队列
* arguments 其他的一些参数
*/
channel.queueDeclare(queueName, false, false, false, null);
/**
* 绑定队列
*/
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称,是否自动ACK,consumer
channel.basicConsume(queueName, true, consumer);
while (true) {
// 参数消息,如果没有消息,这一步将会一直堵塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息:" + msg);
}
}
}
网友评论