简单队列
- 添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
image.png
P:消息的生产者-->队列-->消费者
- 连接rabbitmq
public static Connection getConnections() throws IOException{
//定义一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务器ַ
factory.setHost("127.0.0.1");
// AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("/vhost_mmr");
//用户名
factory.setUsername("huyanglin");
//密码
factory.setPassword("huyanglin");
Connection newConnection = factory.newConnection();
return newConnection;
}
- 发送消息
public static void main(String[] args) throws IOException {
//1.获取链接
Connection connections = ConnectionUtils.getConnections();
//2.创建通道
Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg="hello simple";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println(msg);
channel.close();
connections.close();
}
- 监听(接收消息)
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException {
//建立链接、创建通道
Connection connections = ConnectionUtils.getConnections();
Channel channel = connections.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null) ;
//定义消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//获取到达的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg=new String(body,"utf-8");
System.out.println("取数据"+msg);
};
};
//消费队列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
-
简单队列的不足
- 耦合性高,生产者一一对应消费者(不能满足多个消费者消费队列中的消息)。
- 队列名变更,得同时变更。
网友评论