流程: 首先是获取连接工厂 ConnectionFactory --> 获取一个连接 Connection --> 通过连接建立数据通信 信道 Channel,用 Channel 发送或接收消息。
代码地址:
https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下的 quickstart 包下
maven:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.15.RELEASE</version>
<!--<version>2.1.0.RELEASE</version>-->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.6</version>
</dependency>
</dependencies>
定义和赋值 RabbitMQ 的配置
public interface RabbitMQCommon {
final static String RABBITMQ_HOST = "192.168.0.7";
final static int RABBITMQ_PORT = 5672;
final static String RABBITMQ_DEFAULT_VIRTUAL_HOST = "/";
public final static String RABBITMQ_USERNAME = "guest";
public final static String RABBITMQ_PASSWORD = "guest";
}
消费端代码:
/**
* 快速开始:消费者
*/
@Slf4j
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// queueName, durable, exclusive, autoDelete, arguments
channel.queueDeclare("test1001", true, false, false, null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// queueName, autoAck, Consumer callback
channel.basicConsume("test1001", true, queueingConsumer);
log.info("消费端启动啦~");
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消费端接收到:{}", msg);
}
}
}
生产端代码:
/**
* 快速开始:生产者
*/
@Slf4j
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
String msg = "hello RabbitMQ + " + i;
log.info("生产者发送消息:{}", msg);
channel.basicPublish("", "test1001", null, msg.getBytes());
}
log.info("生产者发送消息完毕");
channel.close();
connection.close();
}
}
run运行消费端的代码
image打开管控台,看到这个队列创建成功了
image image运行生产端的代码,看到如下日志
image点击这个queue进去查看到刚才有消息发送过来了
image在idea查看消费端的日志
image刚才生产端发送的消息已被消费端消费,至此,快速启动demo已完毕
网友评论