public class ProducerWorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
channel.queueDeclare(
"work_queues", // 队列名称
true, // 是否持久化, 当 mq 重启之后还在
false, // 是否独占,只有有一个消费者监听这个队列,当 Connection 关闭时,是否删除队列
false, // 是否自动删除,当没有 Consumer 时,自动删除
null // 参数
); // 没有就创建队列,有就不会创建
// 6 发送消息
for (int i = 1; i <= 10; i++) {
String body = i + " hello rabbitmq ~~~";
channel.basicPublish(
"", // 交换机名称,简单模式下交换机会使用默认的 ""
"work_queues", // 路由名称
null, // 配置信息
body.getBytes() // 发送消息
);
}
// 7 释放资源
channel.close();
connection.close();
}
}
public class ConsumerWorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
channel.queueDeclare(
"work_queues", // 队列名称
true, // 是否持久化, 当 mq 重启之后还在
false, // 是否独占,只有有一个消费者监听这个队列,当 Connection 关闭时,是否删除队列
false, // 是否自动删除,当没有 Consumer 时,自动删除
null // 参数
); // 没有就创建队列,有就不会创建
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
}
};
channel.basicConsume(
"work_queues", // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
public class ConsumerWorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
channel.queueDeclare(
"work_queues", // 队列名称
true, // 是否持久化, 当 mq 重启之后还在
false, // 是否独占,只有有一个消费者监听这个队列,当 Connection 关闭时,是否删除队列
false, // 是否自动删除,当没有 Consumer 时,自动删除
null // 参数
); // 没有就创建队列,有就不会创建
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
}
};
channel.basicConsume(
"work_queues", // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
image.png
image.png
public class ProducerPubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建交换机
String exchangeName = "text_fanout";
channel.exchangeDeclare(
exchangeName, // 交换机名称
BuiltinExchangeType.FANOUT, // 交换机类型(direct 定向,fanout 广播(发送消息到每一个绑定队列),topic 通配符方式,headers 参数匹配)
true, // 是否持久化
false, // 自动删除
false, // 内部使用, 比如 plugin 可能用的
null // 参数
);
// 6 创建队列
String queue1Name = "text_fanout_queue1";
String queue2Name = "text_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 7 绑定队列和交换机
channel.queueBind(
queue1Name, // 队列名称
exchangeName, // 交换机名称
"" // 路由,如果交换机类型是 fanout, routingKey 设置为 ""
);
channel.queueBind(
queue2Name, // 队列名称
exchangeName, // 交换机名称
"" // 路由,如果交换机类型是 fanout, routingKey 设置为 ""
);
// 8 发送消息
String body = "hello rabbitmq ~~~";
channel.basicPublish(
exchangeName,
"", // 路由名称
null, // 配置信息
body.getBytes() // 发送消息
);
// 9 释放资源
channel.close();
connection.close();
}
}
public class ConsumerPubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
String queue1Name = "text_fanout_queue1";
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
System.out.println("将消息打印到控制台");
}
};
channel.basicConsume(
queue1Name, // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
public class ConsumerPubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
String queue2Name = "text_fanout_queue2";
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
System.out.println("将消息插入到数据库");
}
};
channel.basicConsume(
queue2Name, // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
image.png
public class ProducerRouting {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建交换机
String exchangeName = "text_direct";
channel.exchangeDeclare(
exchangeName, // 交换机名称
BuiltinExchangeType.DIRECT, // 交换机类型(direct 定向,fanout 广播(发送消息到每一个绑定队列),topic 通配符方式,headers 参数匹配)
true, // 是否持久化
false, // 自动删除
false, // 内部使用, 比如 plugin 可能用的
null // 参数
);
// 6 创建队列
String queue1Name = "text_direct_queue1";
String queue2Name = "text_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 7 绑定队列和交换机
// 队列1绑定 error
channel.queueBind(
queue1Name, // 队列名称
exchangeName, // 交换机名称
"error" // 路由,如果交换机类型是 fanout, routingKey 设置为 ""
);
// 队列二绑定 info error warning
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
// 8 发送消息
String body = "hello rabbitmq ~~~";
channel.basicPublish(exchangeName, "error", null, body.getBytes());
// 9 释放资源
channel.close();
connection.close();
}
}
public class ConsumerRouting1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
String queue1Name = "text_direct_queue1";
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
System.out.println("将消息插入数据库");
}
};
channel.basicConsume(
queue1Name, // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
public class ConsumerRouting2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
String queue2Name = "text_direct_queue2";
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
System.out.println("将消息打印到控制台");
}
};
channel.basicConsume(
queue2Name, // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
image.png
image.png
public class ProducerTopics {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建交换机
String exchangeName = "text_topic";
channel.exchangeDeclare(
exchangeName, // 交换机名称
BuiltinExchangeType.TOPIC, // 交换机类型(direct 定向,fanout 广播(发送消息到每一个绑定队列),topic 通配符方式,headers 参数匹配)
true, // 是否持久化
false, // 自动删除
false, // 内部使用, 比如 plugin 可能用的
null // 参数
);
// 6 创建队列
String queue1Name = "text_topic_queue1";
String queue2Name = "text_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 7 绑定队列和交换机
// routing Key 系统的名称.日志的级别
// error 存数据库, order 存数据库
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
// 8 发送消息
String body = "hello rabbitmq ~~~";
channel.basicPublish(exchangeName, "order.info", null, body.getBytes());
// 9 释放资源
channel.close();
connection.close();
}
}
public class ConsumerTopic1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
String queue1Name = "text_topic_queue1";
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
System.out.println("将消息插入数据库");
}
};
channel.basicConsume(
queue1Name, // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
public class ConsumerTopic2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/itcast");
factory.setUsername("heima");
factory.setPassword("heima");
// 3 创建连接 Connection
Connection connection = factory.newConnection();
// 4 创建 Channel
Channel channel = connection.createChannel();
// 5 创建队列 Queue
String queue2Name = "text_topic_queue2";
// 6 接受消息
Consumer consumer = new DefaultConsumer(channel) {
// 收到消息后,会自动执行该方法
@Override
public void handleDelivery(
String consumerTag, // 标识
Envelope envelope, // 获取一些信息,交换机,路由 key
AMQP.BasicProperties properties, // 配置信息
byte[] body // 数据
) {
System.out.println("body: " + new String(body));
System.out.println("将消息输出控制台");
}
};
channel.basicConsume(
queue2Name, // 队列名称
true, // 是否自动确认
consumer // 回调对象
);
}
}
image.png
image.png
网友评论