rabbitmq版本3.8.3 erlang版本22.2
image-20200524115051864.pngrabbitmq可以通过命令行的方式,也可通过可视化页面的方式进行操作。
命令行中输入
rabbitmqctl help
[root@s ~]# rabbitmqctl help
Usage
rabbitmqctl [--node <node>] [--timeout <timeout>] [--longnames] [--quiet] <command> [<command options>]
Available commands:
Help:
help Displays usage information for a command
version Displays CLI tools version
...................
可以查看rabbitmq的各种命令行操作,喜欢撸代码的小伙伴可以自行尝试。
这里主要结合idea,springboot,rabbitmq可视化控制页面讲解rabbitmq的使用。
还没安装rabbitmq的小伙伴可以去看:Linux_CentOS7 安装配置Rabbitmq3.8.3填坑之旅
使用springboot java操作rabbitmq
1)新建springboot项目
QQ截图20200524144037.png因为是学习测试用的所以勾选一个spring web就好
QQ截图20200524144748.png项目创建好后在pom.xml(Maven的配置文件)中添加rabbitmq依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
2)启动配置rabbitmq
启动操作页面插件
rabbitmq-plugins enable rabbitmq_management
启动rabbitmq
systemctl start rabbitmq-server
启动Linux外部访问端口
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
创建虚拟机
QQ截图20200524161556.png创建用户
QQ截图20200524162122.png用户绑定虚拟机
QQ截图20200524162330.png QQ截图20200524162435.png前期工作完成开始上代码
3)rabbitmq第一种消息模型测试
QQ截图20200524150318.png该种模型只有“生产者【p】”,“队列”,“消费者【c】”。是最简单的消息队列通信。注意:这里的消息指的是数据说具体点是服务器端通信的数据。虽然使用RabbitMQ Web STOMP可以使web前端作为消息的消费者(特定情况下,消息推送时使用),但是多数情况下消息还是在服务器端流转。消息以byte类型传输,生产和消费时可根据业务情况使用字符串或json格式数据。
以下是测试生产与消费的代码
package com.example.test.btest;
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
class BtestApplicationTests {
//获取springboot默认日志工具
Logger logger = LoggerFactory.getLogger(getClass());
//创建sender测试方法,用于向rabbitmq发送消息,即生产者生产消息
@Test
void sender() throws IOException, TimeoutException {
//创建rabbitmq连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//输入rabbitmq服务器IP
connectionFactory.setHost("192.168.31.230");
/**
*4369 -- erlang发现口
*5672 --client端通信口
*15672 -- 管理界面ui端口
*25672 -- server间内部通信口
*输入客户(外部)通讯端口
*/
connectionFactory.setPort(5672);
//输入要连接的虚拟主机
connectionFactory.setVirtualHost("/test_one");
//操作用户
connectionFactory.setUsername("test_user");
//操作密码
connectionFactory.setPassword("123");
//生成连接
Connection connection = connectionFactory.newConnection();
//生成通信管道
Channel channel = connection.createChannel();
//声明(创建)消息队列
/**
*参数1: queue 队列的名称
*参数2: durable 是否持久化
*参数3: exclusive 是否排外的(独立队列)
*参数4: autoDelete: 是否自动删除
*参数5: arguments: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key *、 x-rnax-priority 等
*当durable = false时,队列非持久化。因为队列是存放在内存中的,所以当RabbitMQ重启或者服务器重 * 启时该队列就会丢失 ;
*当durable = true时,队列持久化。当RabbitMQ重启后队列不会丢失。RabbitMQ退出时它会将队列信息 *保存到 Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库
*当exclusive = true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连 *接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时 *自动删除 ;当exclusive = false则设置队列为非排他的,此时不同连接(Connection)的管道 *Channel可以使用该队列
*如果autoDelete = true,当所有消费者都与这个队列断开连接时,即消费完成消费者调用 *channel.close() connection.close()关闭连接时删除此列队
*
*参考:https://blog.csdn.net/AwayFuture/article/details/103405335
*/
channel.queueDeclare("hello",false,false,false,null);
//发送消息
/**
*参数1: exchange 交换机名称
*参数2: queue 队列名称
*参数3: basicProperties 附加参数
*参数4: 消息数据
*/
channel.basicPublish("","hello",null,"你好世界".getBytes());
//关闭通道
channel.close();
//关闭连接
connection.close();
}
//创建customer测试方法,用于rabbitmq处理消息,即消费者消费消息
@Test
void customer() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.31.230");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test_one");
connectionFactory.setUsername("test_user");
connectionFactory.setPassword("123");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
/**
*参数1:队列名称
*参数2:autoAck 是否自动确认消息,true自动确认,false手动调用确认。
*参数3:回调函数获取消费数据
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
logger.warn("this body ==>" + new String(body));
}
});
}
}
网友评论