当多个消费者同时监听一个队列时,默认消费者会轮询消费消息
一、轮询消费
1、Producer
/**
* 消息生产者
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.176.100");
factory.setUsername("raven");
factory.setPassword("raven");
// 获取一个连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
/**
* 创建一个队里
*
* 队列名称
* 队里里面的消息是否持久化
* 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
* 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
//准备消息
String msg = scanner.next();
/**
* 发布消息
*
* 发送到那个交换机
* 路由key
* 其他参数
* 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("发送消息完成:" + msg);
}
}
}
2、Consumer
/**
* 接收消息
*/
public class Consumer {
// 队列名称
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.176.100");
factory.setUsername("raven");
factory.setPassword("raven");
// 获取一个连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
System.out.println("C等待接收消息。。。。。");
// 接收消息回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
// 取消消息回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息消费被中断。。。");
};
/**
* 消费消息
*
* 消息队列
* 消费成功之后是否要自动应答
* 消费成功/失败回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
3、Consumer1
/**
* 接收消息
*/
public class Consumer1 {
// 队列名称
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.176.100");
factory.setUsername("raven");
factory.setPassword("raven");
// 获取一个连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
System.out.println("C1等待接收消息。。。。。");
// 接收消息回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
// 取消消息回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息消费被中断。。。");
};
/**
* 消费消息
*
* 消息队列
* 消费成功之后是否要自动应答
* 消费成功/失败回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
4、发送消息
11
发送消息完成:11
22
发送消息完成:22
33
发送消息完成:33
44
发送消息完成:44
55
发送消息完成:55
66
发送消息完成:66
5、消费消息
C等待接收消息。。。。。
11
33
55
C1等待接收消息。。。。。
22
44
66
网友评论