- p:生成者
- c:消费者
一、RabbitMQ没有队列 image.png
hello_world队列.pngimage.png
二、代码测试
Producer
/**
* 消息生产者
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.176.100");
factory.setUsername("raven");
factory.setPassword("raven");
// 获取一个连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
/**
* 创建一个队里
*
* 队列名称
* 队里里面的消息是否持久化
* 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
* 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//准备消息
String msg = "Hello World";
/**
* 发布消息
*
* 发送到那个交换机
* 路由key
* 其他参数
* 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("消息发送完毕!!!");
}
}
Consumer
/**
* 接收消息
*/
public class Consumer {
// 队列名称
private static final String QUEUE_NAME = "hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.176.100");
factory.setUsername("raven");
factory.setPassword("raven");
// 获取一个连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
System.out.println("等待接收消息。。。。。");
// 接收消息回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
// 取消消息回调
CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消息消费被中断。。。");
};
/**
* 消费消息
*
* 消息队列
* 消费成功之后是否要自动应答
* 消费成功/失败回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
网友评论