本系列主要讲基于SpringBoot的RabbitMQ消息队列,基于Java的只做简单记录。
1. 生产者
- 创建连接工厂-->设置参数-->创建连接-->创建通道-->创建交换器-->发送消息-->关闭通道-->关闭连接
package com.fzb.rabbitmq.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MQProducerJ {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("192.168.89.168");
factory.setUsername("fzb");
factory.setPassword("fzb2019");
factory.setPort(5672);
factory.setVirtualHost("fzb_host");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换器 默认direct
channel.exchangeDeclare("direct.java.exchange","direct",true);
//发送消息到队列中
String message = "这是java代码:Hello RabbitMQ.";
channel.basicPublish("direct.java.exchange", "HelloJava", null, message.getBytes("UTF-8"));
//关闭通道和连接
channel.close();
//关闭连接
connection.close();
}
}
2. 消费者
- 创建连接工厂-->设置参数-->创建连接-->创建通道-->创建交换器-->创建队列-->创建绑定关系-->消费消息-->关闭通道-->关闭连接
package com.fzb.rabbitmq.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
public class MQConsumerJ {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("192.168.89.168");
factory.setUsername("fzb");
factory.setPassword("fzb2019");
factory.setPort(5672);
factory.setVirtualHost("fzb_host");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个交换器 默认direct
channel.exchangeDeclare("direct.java.exchange","direct",true);
//创建一个队列
channel.queueDeclare("direct.java.queue", true, false, false, null);
//创建绑定关系
channel.queueBind("direct.java.queue","direct.java.exchange","HelloJava");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println( message);
}
};
channel.basicConsume("direct.java.queue", true, consumer);
}
}
三十岁,请拒绝油腻
网友评论