一.简单队列
简单队列模式实际上就是work模式的一种特例,生产者和消费者都只有一个
1.创建Virtual Hosts
2.实现消息生产者
2.1 添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
2.2 获取RabbitMQ连接
cn.lovingliu.rabbitmq.util.ConnectionUtil
package cn.lovingliu.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 获取rabbitMq连接
* @Date:Created in 2020-01-15
*/
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("49.235.110.134");
factory.setPort(5672);
factory.setVirtualHost("test001_host");
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
return connection;
}
}
2.3 生产者
生产者负责创建消息队列并发送消息入列,简单分为5步:
- 获取连接
- 创建通道
- 创建队列声明
- 发送消息
- 关闭队列
cn.lovingliu.rabbitmq.producer.Send
package cn.lovingliu.rabbitmq.producer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Author:LovingLiu
* @Description: 生产者发送消息
* @Date:Created in 2020-01-15
*/
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception {
/** 1.获取连接 */
Connection newConnection = ConnectionUtil.getConnection();
/** 2.创建通道 */
Channel channel = newConnection.createChannel();
/** 3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 持久化
String msg = "我是生产者生成的消息";
System.out.println("生产者发送消息:" + msg);
/** 4.发送消息 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化
channel.close();
newConnection.close();
}
}
3.消费者
消费者实现和生产者实现过程差不多,但是没有关闭连接和通道,是因为要消费者一直等待随时可能发来的消息,大致分为如下3步:
- 获取连接
- 获取通道
- 监听队列
cn.lovingliu.rabbitmq.consumer.MyConsumer
package cn.lovingliu.rabbitmq.consumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @Author:LovingLiu
* @Description: 自定义消费者。QueueingConsumer 已被废弃,建议使用继承DefaultConsumer的方式
* @Date:Created in 2020-01-15
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
cn.lovingliu.rabbitmq.consumer.Recv
package cn.lovingliu.rabbitmq.consumer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Author:LovingLiu
* @Description: 消费者消费消息
* @Date:Created in 2020-01-15
*/
public class Recv{
/** 队列名称 */
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
/** 1.获取连接 */
Connection newConnection = ConnectionUtil.getConnection();
/** 2.获取通道 */
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 3.监听队列 */
channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
}
}
4.运行截图
运行生产者
运行消费者
网友评论