Hello RabbitMQ
先在 RabbitMQ 管理面板新建虚拟机,如下:
![](https://img.haomeiwen.com/i12178012/6e33bb5f15196a45.png)
然后,项目中加入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
然后写测试代码,端口是默认是5672
@Test
public void mqTestSend() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue1";
channel.queueDeclare(queueName,false,false,false,null);
String msg="Hello MQ";
channel.basicPublish("",queueName,null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
然后我们可以在管理系统中看到:
![](https://img.haomeiwen.com/i12178012/a9da9025e658e210.png)
接着测试接收:
@Test
public void mqTestReceiver() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue1";
channel.queueDeclare(queueName,false,false,false,null);
System.out.println(" Waiting for messages.");
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
channel.close();
connection.close();
}
可以看到:
Waiting for messages.
[x] Received 'Hello MQ'
同时,RabbitMQ后台管理系统中:
![](https://img.haomeiwen.com/i12178012/c50eae2a3778b3b7.png)
通过上面的Hello World例子,先带入一些基本概念,先看图:
![](https://img.haomeiwen.com/i12178012/723c82b312687f75.png)
所有的MQ基本上是这样的,消费者(consumer)订阅某个队列,生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
RabbitMQ 的工作队列
默认的,RabbitMQ 会轮循的方式把消费发给消费者,也就是说,比如100条消息,两个消费者,每个消费者都会接收50条,消费者1收到0、2、4......,消费者2收到1、3、5.....
消费者1,这个消费者工作效率比较好,处理业务需要10ms:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue1";
channel.queueDeclare(queueName,false,false,false,null);
System.out.println("Consumer1 Waiting for messages.");
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
}
}
消费者2,这个消费者工作效率比较差,处理业务需要1000ms:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue1";
channel.queueDeclare(queueName,false,false,false,null);
System.out.println("Consumer2 Waiting for messages.");
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
}
}
生成者,发布100条消息:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue1";
channel.queueDeclare(queueName,false,false,false,null);
for (int i = 0; i < 100; i++) {
String msg="Hello MQ"+i;
channel.basicPublish("",queueName,null,msg.getBytes());
System.out.println("send:"+msg);
}
channel.close();
connection.close();
}
}
然后看测试结果:
![](https://img.haomeiwen.com/i12178012/8b2cfaec4e42ddfc.png)
![](https://img.haomeiwen.com/i12178012/0da424439d505ec9.png)
可以看到,消费者1和消费者2都执行了50次,这就是 RabbitMQ 默认的处理方式:
![](https://img.haomeiwen.com/i12178012/75a5053db1bd744e.png)
但是实际开发中,效率高的机器应该是多做点事,首先要把消息消费改成手动模式。
RabbitMQ 有两种消息消费模式,一个是自动模式,另一个是手动模式。
自动模式:消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。
手动模式:消费者从消息队列获取消息后,消费者成功消费后需要将状态返回到服务端,服务端才认为该消息已经成功消费。
API channel.basicConsume 第二个参数,true 表示自动模式,false 表示手动模式。手动模式消费完,通过channel.basicAck返回。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
设置完手动模式之后,消费这还需要使用这个API,参数表示同一时刻服务器只会发送多少条消息给消费者。
//同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
消费者1:
......
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
消费者2:
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
测试可以看到,消费者1分配了多个任务,多劳多得:
![](https://img.haomeiwen.com/i12178012/7f134a33d327968f.png)
![](https://img.haomeiwen.com/i12178012/5c521390f73ad6c1.png)
RabbitMQ 订阅模式
这里必须理解一波概念,先上个模型:
![](https://img.haomeiwen.com/i12178012/2c6d2100e989579e.png)
解释下上面的模型:
- 有一个生产者,多个消费者;
- 每个消费者都有自己的队列;
- 生产者没有直接吧消息发送到对了,而是发送的交换机(Exchange);
- 每个队列都要绑定到交换机上;
- 生产者发送的消息经过交换机到达队列就能实现一个消息被多个消费者消费;
- 交换机可以通过配置交换类型指定让某些队列接收消息;
- 如果消息发送到没有队列绑定的交换机上,那么消息将丢失;
交换机的交换类型:
- FANOUT:所有绑定的队列都可以收到消息;
- DIRECT:会有一个路由键的概念(routingKey),所有绑定在此交换机的队列,只有与路由键匹配的才能收到消息;
- TOPIC:和Direct模式的原理是一样的,只不过是有 routingKey 可以使用通配符,可以让多个队列接收消息。
FANOUT
生产者:
public class Producer {
public static final String EXCHANGE_NAME = "exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
String msg="Hello MQ";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
}
消费者1:
public class Consumer1 {
public static final String EXCHANGE_NAME = "exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("Consumer1 Waiting for messages.");
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
}
}
消费者2:
public class Consumer2 {
public static final String EXCHANGE_NAME = "exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName="queue2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("Consumer2 Waiting for messages.");
channel.basicQos(10);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
String message = new String(delivery.getBody(), "utf-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
}
}
注意,消费者1和消费者2队列名称是不同的,生产者指定了交换机:
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
消费者绑定了这个交换机:
channel.queueBind(queueName,EXCHANGE_NAME,"");
然后可以看到,消费者1和消费者2都接收到了消息:
![](https://img.haomeiwen.com/i12178012/c1d1aa00a87e506f.png)
![](https://img.haomeiwen.com/i12178012/eb1a982e8412ce1f.png)
DIRECT
这里先说个坑,RabbitMQ 貌似无法修改交换机参数,所以删除再重新创建时选择 DIRECT 。
生产者设置交换机为 DIRECT ,发送时指定 key 为 r1
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
String msg="Hello MQ";
channel.basicPublish(EXCHANGE_NAME,"r1",null,msg.getBytes());
消费者1绑定交换机并添加 key r1 ,消费者2只是绑定交换机
channel.queueBind(queueName,EXCHANGE_NAME,"r1"); //消费者1
channel.queueBind(queueName,EXCHANGE_NAME,""); //消费者2
结果如下,消费者1接收到,消费者2没接收到:
![](https://img.haomeiwen.com/i12178012/0f4bf1954b398b7d.png)
![](https://img.haomeiwen.com/i12178012/0478d7acdefd83e8.png)
TOPIC
生产者设置交换机为 DIRECT ,发送时指定 key 为 Apple.1
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
String msg="Hello MQ";
channel.basicPublish(EXCHANGE_NAME,"Apple.1",null,msg.getBytes());
消费者1绑定交换机并添加 key Apple.# ,消费者2并添加 key Banner.#,# 表示通配符
channel.queueBind(queueName,EXCHANGE_NAME,"Apple.#"); //消费者1
channel.queueBind(queueName,EXCHANGE_NAME,"Banner.#"); //消费者2
结果也是消费者1收到,消费者2没收到。
RabbitMQ 远程调用(RPC)
若某些业务需要等带服务端把我的消息处理完后这进行下一步,这就需要远程调用了,当然这里的并不是唯一实现RPC的方式。
先描述下流程,客户端作为生产者发送消息,同时作为消费者接收处理完成的消息,服务端作为消费者接收消息,同时作为生产者发生完成的消息。
客户端:
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
connection = factory.newConnection();
channel = connection.createChannel();
}
public String call(String message) throws IOException,InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("utf-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(s);
if(delivery.getProperties().getCorrelationId().equals(corrId)){
response.offer(new String(delivery.getBody(),"utf-8"));
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(s);
}
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
@Override
public void close() throws Exception {
connection.close();
}
}
服务端:
public class RPCServer {
private static String requestQueueName = "queue";
private static int fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("dane");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(requestQueueName, false, false, false, null);
channel.queuePurge(requestQueueName);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
channel.basicConsume(requestQueueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
System.out.println(delivery.getProperties().getReplyTo());
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(s);
}
});
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
测试:
public class RPCMain {
public static void main(String[] args) throws Exception{
RPCClient rpcClient = new RPCClient();
System.out.println(" [x] Requesting getMd5String(30)");
String response = rpcClient.call("30");
System.out.println(" [.] Got '" + response + "'");
rpcClient.close();
}
}
网友评论