一、简介
1. AMQP
分布式项目中,模块与模块之间的通信可以使用RPC框架,如Dubbo,但RPC中调用方模块获取到被调用方的结果是同步的,争对一些只需要异步调用的方法,如日志存储、发送消息等,RPC就显得效率低下了,AMQP协议的推出就是用来解决进程之间的异步消息通信
AMQP从设计上来说,AMQP就是一个发布订阅者模式,整体可以看作一个流,核心是中间的管道,即消息队列
有了AMQP,发布者只需要关注发布消息,订阅者只需要关注订阅消息,而流的速度、总承载量等都交由AMQP管理,从而做到异步调用、削峰填谷、服务解耦
2. RabbitMQ
RabbitMQ也是实现了AMQP的一种消息中间件,由Erlang编写,由于Erlang语言对并发的特性,RabbitMQ相对于其他MQ(kafka、RocketMQ等),延迟最低
二、安装
1. Erlang安装
安装编译Erlang所使用的工具:
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel
下载解压Erlang源码,erlang安装版本需要和rabbitmq对应,查看网址:https://www.rabbitmq.com/which-erlang.html#compatibility-matrix
wget http://erlang.org/download/otp_src_23.2.tar.gz
tar -xvf otp_src_23.2.tar.gz
编译:
cd otp_src_23.2
mkdir -p /usr/local/erlang
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
安装:
make install
配置全局环境变量:
vi /etc/profile
export ERLANGROOT=/usr/local/erlang
export PATH=$PATH:$ERLANGROOT/bin
2. RabbitMQ安装
争对Erlang版本为23.2,我们安装RabbitMQ的3.8.35版本,其他版本可以从githbu上查看:https://github.com/rabbitmq/rabbitmq-server/tags
下载并解压:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.35/rabbitmq-server-generic-unix-3.8.35.tar.xz
mkdir /usr/local/rabbitmq
tar -xvf rabbitmq-server-generic-unix-3.8.35.tar.xz -C /usr/local/rabbitmq/
生效界面插件:
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_management
启动RabbitMQ:
./rabbitmq-server -detached
停止RabbitMQ:
./rabbitmqctl stop_app
访问15672端口:
三、账号管理
管理界面可以在本地使用guest
账号(密码与账号相同)登录,但外部访问是不允许的,需要为RabbitMQ添加账号
1. 创建账号
./rabbitmqctl add_user aruba aruba
2. 授予角色
./rabbitmqctl set_user_tags aruba administrator
3. 账号授权
./rabbitmqctl set_permissions -p "/" aruba ".*" ".*" ".*"
"/"
对应默认的虚拟主机
4. 登录
使用新的账号进行登录:
四、RabbitMQ架构
下面是RabbitMQ实现原理的细节部分
针对上面名词的解释:
名词 | 解释 |
---|---|
Exchange 交换机 | 接收发布者消息,并将消息通过路由规则路由到相应队列 |
Binding key | 消息队列和交换器之间的关联,通过路由键Routing-key 绑定 |
Channel 信道 | TCP里面的虚拟链接。发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。由于TCP连接数有上限,且必须通过三次握手,建立连接时性能低下,相比于每个连接都使用TCP,一条TCP连接可以容纳无限的信道 |
Virtual Host | 虚拟主机。表示一批交换器,消息队列和相关对象。默认创建一个为"/" 的虚拟主机 |
我们主要关注RabbitMQ如何发送消息和订阅消费消息,首先RabbitMQ服务中交换机和队列通过Routing-key
进行关联,再由发布者与RabbitMQ服务通过Channel
建立连接后,通过Routing-key
发送消息到交换机,最后由交换机对消息进行路由匹配到相应队列后,由队列进行分发
而订阅消费消息,一个消息只能有一个消费者成功的消费,收到消息后需要发送ack
,告诉RabbitMQ服务成功的消费了该消息,针对多个消费者订阅一个队列的情况,RabbitMQ默认使用轮询的方式发送给不同的消费者
五、RabbitMQ通讯方式
Rabbit提供的通讯方式,可以从官网查看:https://rabbitmq.com/getstarted.html
分别为:
通讯方式 | 描述 |
---|---|
Hello World! | 为了入门操作,使用默认交换机,一个队列被一个消费者订阅 |
Work queues | 使用默认交换机,一个队列可以被多个消费者订阅 |
Publish/Subscribe | 手动创建交换机(FANOUT),一个消息可以路由到多个队列中 |
Routing | 手动创建交换机(DIRECT),按照routing-key 进行路由匹配 |
Topics | 手动创建交换机(TOPIC),routing-key 支持通配符匹配 |
RPC | RPC方式 |
Publisher Confirms | 保证消息可靠性 |
创建一个项目,导入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
连接工具类:
public class RBConnectionUtil {
public static final String RABBITMQ_HOST = "192.168.42.4";
public static final int RABBITMQ_PORT = 5672;
public static final String RABBITMQ_USERNAME = "guest";
public static final String RABBITMQ_PASSWORD = "guest";
// 虚拟主机
public static final String RABBITMQ_VIRTUAL_HOST = "/";
/**
* 构建RabbitMQ的连接对象
*
* @return
*/
public static Connection getConnection() throws Exception {
//1. 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置RabbitMQ的连接信息
factory.setHost(RABBITMQ_HOST);
factory.setPort(RABBITMQ_PORT);
factory.setUsername(RABBITMQ_USERNAME);
factory.setPassword(RABBITMQ_PASSWORD);
factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
//3. 返回连接对象
Connection connection = factory.newConnection();
return connection;
}
}
1. Hello World!
Hello World!
使用的是默认的交换机
生产者:
public class Publisher {
private static final String QUEUE_NAME = "hello";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. 发送消息
String message = "hello rabbit";
//参数: 交换机(默认为"") routing-Key(默认交换机就是队列名) 消息其他参数 消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
运行后,管理界面可以看到队列中有了一条消息:
消费者:
public class Consumer {
private static final String QUEUE_NAME = "hello";
@Test
public void consumer() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, callback);
System.in.read();
}
}
运行结果:
2. Work queues
Work queues
也是使用默认交换机,发送消息是一样的,只不过一个队列被两个消费者订阅
2.1 轮询消费
上面提到过,一个消息只能被一个消费者消费,RabbitMQ默认使用轮询方式分发给不同的消费者
生产者:
public class Publisher {
private static final String QUEUE_NAME = "work";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. 发送消息
for (int i = 0; i < 10; i++) {
//参数: 交换机(默认为"") routing-Key(默认交换机就是队列名) 消息其他参数 消息
channel.basicPublish("", QUEUE_NAME, null, String.valueOf(i).getBytes());
}
}
}
消费者:
public class Consumer {
private static final String QUEUE_NAME = "work";
@Test
public void consumer1() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(QUEUE_NAME, true, callback);
System.in.read();
}
@Test
public void consumer2() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(QUEUE_NAME, true, callback);
System.in.read();
}
}
运行结果:
2.2 饿汉消费
针对上面的方式,每个消费者不管性能如何,都会按照轮询方式进行分发,如果消费者1
消费一个消息需要200ms
,消费者2
消费一个消息需要1000ms
,消费完10个
消息最少需要5s
,那么如何解决这个问题?
手动ack
+消息流控:自动ack
当接收到消息时就会发送ack
给RabbitMQ服务,我们需要在执行完逻辑处理后,手动执行ack
,还需要设置消息流控(一次拿多少条消息),才能实现争抢消息
消费者:
public class Consumer {
private static final String QUEUE_NAME = "work";
@Test
public void consumer1() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.1 流控设置
channel.basicQos(1);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
//手动ack 参数:deliveryTag 是否批量操作
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(QUEUE_NAME, false, callback);
System.in.read();
}
@Test
public void consumer2() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.1 流控设置
channel.basicQos(1);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
//手动ack 参数:deliveryTag 是否批量操作
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(QUEUE_NAME, false, callback);
System.in.read();
}
}
运行结果:
3. Publish/Subscribe
通过Work queues
我们知道了一条消息只能被一个消费者消费,而实际开发中,一条消息需要被多个消费者消费的情况很多
Publish/Subscribe
就是为了解决这个问题而产生的,队列中的一条消息只能被一个消费者消费,而不同队列中可以存放相同的消息,Publish/Subscribe
使得将一条消息路由到多个队列,进而被多个消费者订阅消费
Publish/Subscribe
需要手动创建交换机(FANOUT
),并手动绑定多个队列
生产者:
public class Publisher {
public static final String QUEUE_NAME1 = "p1";
public static final String QUEUE_NAME2 = "p2";
public static final String EXCHANGE_NAME = "ps";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key(直接绑定为空即可)
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
//6. 发送消息
//参数: 交换机 routing-Key(直接绑定不需要) 消息其他参数 消息
channel.basicPublish(EXCHANGE_NAME, "", null, "publish/subscribe!".getBytes());
}
}
消费者:
public class Consumer {
@Test
public void consumer1() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(Publisher.QUEUE_NAME1, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1获取到消息:" + new String(body, "UTF-8"));
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);
System.in.read();
}
@Test
public void consumer2() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(Publisher.QUEUE_NAME2, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2获取到消息:" + new String(body, "UTF-8"));
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);
System.in.read();
}
}
运行结果:
4. Routing
Routing
不仅需要手动创建交换机(DIRECT
),还需要在绑定队列时指定routing-key
,消息的发送是根据routing-key
来路由到不同的队列
生产者:
public class Publisher {
public static final String QUEUE_NAME1 = "q1";
public static final String QUEUE_NAME2 = "q2";
public static final String EXCHANGE_NAME = "routing";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//4. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "a");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "b");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "c");
//6. 发送消息
//参数: 交换机 routing-Key 消息其他参数 消息
channel.basicPublish(EXCHANGE_NAME, "a", null, "hi a".getBytes());
channel.basicPublish(EXCHANGE_NAME, "b", null, "hi b".getBytes());
channel.basicPublish(EXCHANGE_NAME, "c", null, "hi c".getBytes());
}
}
消费者和Publish/Subscribe
的代码相同:
public class Consumer {
@Test
public void consumer1() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(Publisher.QUEUE_NAME1, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1获取到消息:" + new String(body, "UTF-8"));
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);
System.in.read();
}
@Test
public void consumer2() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(Publisher.QUEUE_NAME2, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2获取到消息:" + new String(body, "UTF-8"));
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);
System.in.read();
}
}
运行结果:
5. Topic
Topic
的路由规则可以通过通配符进行匹配
-
*
:表示占位符 -
#
:表示通配符
生产者:
public class Publisher {
public static final String QUEUE_NAME1 = "topic-q1";
public static final String QUEUE_NAME2 = "topic-q2";
public static final String EXCHANGE_NAME = "topic";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//4. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.a.*");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.b");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "c.#");
//6. 发送消息
//参数: 交换机 routing-Key 消息其他参数 消息
channel.basicPublish(EXCHANGE_NAME, "1.a.3", null, "hi a".getBytes());
channel.basicPublish(EXCHANGE_NAME, "2.b", null, "hi b".getBytes());
channel.basicPublish(EXCHANGE_NAME, "c.1.2.3", null, "hi c".getBytes());
}
}
消费者代码相同,运行结果:
6. RPC
RPC
方式就是生产者发送消息时附带一些信息,消费者消费时,又通过另一个队列发送返回信息给生产者
生产者不仅仅要发送消息,还要订阅另一个队列:
public class Publisher {
public static final String QUEUE_PUB = "topic-publisher";
public static final String QUEUE_CON = "topic-consumer";
@Test
public void publisher() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 创建信道
Channel channel = connection.createChannel();
//3. 构建队列 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(QUEUE_PUB, false, false, false, null);
channel.queueDeclare(QUEUE_CON, false, false, false, null);
//4. 监听消费者发送消息
channel.basicConsume(QUEUE_CON, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者发送的消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
//5. 发送消息
// 构建额外的信息
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.replyTo(QUEUE_CON)
.correlationId(UUID.randomUUID().toString())
.build();
//参数: 交换机 routing-Key 消息其他参数 消息
channel.basicPublish("", QUEUE_PUB, props, "hi consumer".getBytes());
System.in.read();
}
}
消费者除了订阅消息外,还需要做相应的返回消息处理:
public class Consumer {
@Test
public void consumer() throws Exception {
//1. 获取连接对象
Connection connection = RBConnectionUtil.getConnection();
//2. 构建信道
Channel channel = connection.createChannel();
//3. 构建队列 与生产者相同 参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
channel.queueDeclare(Publisher.QUEUE_PUB, false, false, false, null);
channel.queueDeclare(Publisher.QUEUE_CON, false, false, false, null);
//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
//5. 开始返回消息
String respQueueName = properties.getReplyTo();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", respQueueName, props, "hi publisher".getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 参数:队列名 是否自动ack 监听回调
channel.basicConsume(Publisher.QUEUE_PUB, false, callback);
System.in.read();
}
}
运行结果:
网友评论