RabbitMQ
Erang开发
支持集群
支持内存节点和磁盘节点
RabbitMQ的模式
(1)单一模式
(2)普通模式(默认集群模式):只复制交换机和队列,不复制数据。
(3)镜像模式:复制交换机队列和数据
RabbitMQ 安装
配置yum源
yum install -y epel-release
安装RabbitMQ-Server
yum install -y rabbitmq-server
启动服务
/usr/lib/rabbitmq/bin/rabbitmq-plugins list //查看插件安装情况
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management //启用rabbitmq_management服务
service rabbitmq-server start
打开后台管理
打开浏览器输入http://192.168.100.143:15672, 输入默认的Username:guest,输入默认的Password:guest ,登录后出现如图所示的界面。
后台管理界面
集群配置
1.环境
IP | 主机名 | 用处 |
---|---|---|
192.168.33.10 | centos01 | 磁盘 |
192.168.33.11 | centos02 | 内存 |
192.168.33.12 | centos03 | 内存 |
配置本地域名,以centos01为例
vi /etc/hostname
修改为:
centos01.localdomain
vi /etc/hosts
添加hosts
192.168.33.10 centos01 192.168.33.11 centos02 192.168.33.12 centos03
centos02,centos03,做类似配置。
<font color='#A52A2A'>注意:一定要先配置域名和hosts,确保RabbitMQ Managerment中的Nodes 为 Rabbit@centos01</font>
2.安装软件
三台机器上安装RabbitMQ
yum install -y epel-release
yum install -y rabbitmq-server
3.复制erlang.cookie
# cat /var/lib/rabbitmq/.erlang.cookie
XAHPZVPYUQDWWJIOHUPQ
复制XAHPZVPYUQDWWJIOHUPQ到centos02和centos03
三台服务器上启动
service rabbitmq-server start
如果过程如果报错:
{{shutdown,{failed_to_start_child,net_sup,{shutdown,{failed_to_start_child,auth,{"Cookie file /var/lib/rabbitmq/.erl
则修改:
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
4.将centos02、centos03作为内存节点加入centos01节点集群中
rabbitmqctl stop_app //停掉rabbit应用
rabbitmqctl join_cluster --ram rabbit@centos01 //加入到磁盘节点
rabbitmqctl start_app //启动rabbit应用
如果执行:rabbitmqctl join_cluster --ram rabbit@centos01 失败。要确保rabbitmq management的nodes中显示的名字为:rabbit@centos01。如果不是,
则要修改 /etc/hostname 为:centos01.localdomain,并重启
5.rabbitmq management
打开http://localhost:15672/#/,nodes中显示为三个,则代表成功
从图中可以看出,centos01 为磁盘节点(Disk),centos02,centos03 为内存节点(RAM)
6.启动trace功能
rabbitmq-plugins enable rabbitmq_tracing
rabbitmq trace_on
7.负载均衡
Rabbitmq集群配置好之后,需要使用负载均衡技术来将所有的机器进行串联起来,可以使用HaProxy来进行负载均衡。
RabbitMQ基本概念
Broker:可以理解为消息队列服务器实体,负责接收消息生产者的消息,然后将消息发送至消息接收者或者其他的Broker。
Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。
Queue:消息队列,消息通过发送和路由之后最终到达的地方。
Binding:绑定,作用是把Exchange和Queue按照路由规则绑定起来。
Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
Virtual host:虚拟主机,他是对Broker的虚拟划分,将消费者、生产这和他们依赖的AMQP相关结构进行隔离,一般是为了安全考虑。
Connection:连接,代表生产者、消费者、Broker之间惊喜通信的物理网络。
Channel:消息通道,用于连接生产者和消费者的逻辑结构。
Producer:消息生产者
Consumer:消息消费者
RabbitMQ Sprint Boot集成
1.项目配置
pom.xml依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.properties rabbitmq 配置
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
.direct
队列配置
@Configuration
public class MQConfig {
@Bean
public Queue queue(){
return new Queue("direct.queue");
}
}
消息体Bean
public class User implements Serializable{
private long id;
private String name;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
生产者
@Component
public class MQProducer {
@Autowired
AmqpTemplate amqpTemplate;
public void direct(User msg){
amqpTemplate.convertAndSend("direct.queue",msg);
}
}
消费者
@Component
public class MQReceiver {
@RabbitListener(queues = "direct.queue")
public void direct(User msg){
System.out.println(msg.getName());
}
}
2.topic
队列配置
@Component
@Configuration
public class MQTopicProducer {
@Autowired
AmqpTemplate amqpTemplate;
@Bean(name = "queueA")
public Queue queueA(){
return new Queue("topic.queue.a");
}
@Bean(name = "queueB")
public Queue queueB(){
return new Queue("topic.queue.b");
}
@Bean(name = "topicExchange")
public TopicExchange exchange(){
return new TopicExchange("topic.exchange");
}
@Bean
Binding bindingExchangeQueueA(@Qualifier("queueA") Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("topic.#");
}
@Bean
Binding bindingExchangeQueueB(@Qualifier("queueB") Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("topic.queue.b");
}
}
生产者
public void topic(String routingKey,User user){
amqpTemplate.convertAndSend("topic.exchange",routingKey,user);
}
消费者
@Component
public class MQTopicConsumer {
@RabbitListener(queues = "topic.queue.a")
public void queueA(User user){
System.out.println("queueA:"+user.getName());
}
@RabbitListener(queues = "topic.queue.b")
public void queueB(User user){
System.out.println("queueB:"+user.getName());
}
}
3.fanout
队列配置
@Configuration
public class MQFanoutProducer {
@Bean(name = "fanoutQueueA")
Queue queueA(){
return new Queue("fanout.queue.a");
}
@Bean(name = "fanoutQueueB")
Queue queueB(){
return new Queue("fanout.queue.b");
}
@Bean(name = "fanoutQueueC")
Queue queueC(){
return new Queue("fanout.queue.c");
}
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange");
}
@Bean
Binding bindingQueueA(@Qualifier("fanoutQueueA") Queue queue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
Binding bindingQueueB(@Qualifier("fanoutQueueB") Queue queue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
Binding bindingQueueC(@Qualifier("fanoutQueueC") Queue queue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
生产者
@Autowired
AmqpTemplate amqpTemplate;
public void fanout(User user){
amqpTemplate.convertAndSend("fanout.exchange","",user);
}
消费者
@Component
public class MQFanoutConsumer {
@RabbitListener(queues = "fanout.queue.a")
public void queueA(User user){
System.out.println("QueueA:"+user.getName());
}
@RabbitListener(queues = "fanout.queue.b")
public void queueB(User user){
System.out.println("QueueB:"+user.getName());
}
@RabbitListener(queues = "fanout.queue.c")
public void queueC(User user){
System.out.println("QueueC:"+user.getName());
}
}
网友评论