- work queues 工作队列 轮询分发
-
为什么会出现工作队列
- simple队列 是一一对应的,实际开发中,生产者发送消息是毫不费力的,而消费者一般是与业务结合的,消费者接收到消息后就需要处理,可能需要花费时间。这时候队列就会挤压着很多消息。
-
生产者:
public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, InterruptedException {
// 获取链接
Connection connections = ConnectionUtils.getConnections();
// 获取channel
Channel channel = connections.createChannel();
//
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String msg = "workQueue" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("发送的消息" + msg);
Thread.sleep(i + 20);
}
channel.close();
connections.close();
}
}
- 消费者1
public class Recv1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connections = ConnectionUtils.getConnections();
// 获取通道
Channel channel = connections.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 一旦有消息到达,就触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("消费者1" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
- 消费者2
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
// 获取链接
Connection connections = ConnectionUtils.getConnections();
// 获取通道
Channel channel = connections.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 一旦有消息到达,就触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("消费者2" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
现象:消费者1和消费者2处理的数据消息是一样的,
消费者1都是偶数
消费者2都是奇数
这种方式叫 轮询分发(round-robin)结果是 不管谁忙或者谁闲,任务消息都是一边一个轮询发
公平分发
- 生产者:在消费者返回确认消息之前,只分发一个消息
/**
* 每个消费者发送确认消息之前,消息队列不发送下一个消息,一次只处理一个消息
*/
int prefecthCount=1;
channel.basicQos(prefecthCount);
- 消费者
- 确认每次只收到一个消息
- 每次处理完消息要返回确认信息
- 自动应答 关闭
现象:消费者1处理得比消费者2多(能者多劳,1模拟暂停时间是1000,2模拟暂停时间是2000)
-
消息应答 与 消息持久化
boolean autoAck = false;//自动应答=false
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
- boolean autoAck = true;
- 自动确认模式,一旦rabbitmq将消息分发给消费者,就会从内存中删除。这种情况,如果杀死正在执行任务的消费者,则会丢失正在处理的消息。
- boolean autoAck = false;
- 手动模式,如果有一个消费者挂掉,就会交付给其他消费者。rabbitMQ支出消息应答,消费者处理完消息后,给abbitmq发送确认消息,rabbitmq收到后就会删除内存中的消息。
- 消息应答默认是打开的==>false
- boolean autoAck = true;
-
消息的持久化
- boolean autoAck = false确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
boolean durable=false;//持久化
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
- 我们将程序中的 boolean durable=false 直接改成true是不可以的,因为test_work_queue队列已经定义成未持久化的队列,rabbitmq不允许重新定义已经存在的队列。
- boolean autoAck = false确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
网友评论