RabbitMQ的几种常见模式的详细介绍和使用---实践
官网:RabbitMQ
官方文档:各个模式简介
RabbitMQ就不详细介绍了,以下就是各个模式的原理和实践操作:
1.安装配置
查看mq镜像: docker search rabbitmq:management
下载mq镜像: docker pull rabbitmq:management
安装镜像:docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
说明
5672:默认的客户端连接的端口
15672:默认的web管理界面的端口
命令中的【RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin】是web管理平台的用户名和密码
【 -p 15672:15672】 是控制平台docker映射到系统的对应端口
【 -p 5672:5672】 是应用程序的访问端口
访问地址
http://ip:15672
如果是linux服务器,首先开放服务器端口,例如阿里云,先配置安全组:
data:image/s3,"s3://crabby-images/2e127/2e127100ccab9e4f14f0be46baa3c4d0846f8da4" alt=""
添加:
data:image/s3,"s3://crabby-images/cf326/cf3269cf3a8ee6720e14d1b5dce292a498cc5e4e" alt=""
开始安装:
查询 docker search rabbitmq:management
data:image/s3,"s3://crabby-images/a3d11/a3d11ce81044dcca984c122cee2c8ecb641c15c4" alt=""
下载 docker search rabbitmq:management
安装 docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
data:image/s3,"s3://crabby-images/7c0d5/7c0d5df9accfdbf135deb7cf077fe69c66034dd8" alt=""
安装成功
data:image/s3,"s3://crabby-images/dc039/dc039e6c8cafea8ebd7b32cd8e08cf5c64b7ac86" alt=""
访问地址:
http://你的ip:15672
2.测试
先创建一个连接类:
package boot.spring.controller;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class ConnectionUtil {
/**
* 获取连接
* @return Connection
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的ip");
factory.setPort(5672);
//设置vhost
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
//通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
2.1简单模式
一个生产者,一个消费者。
原理图:data:image/s3,"s3://crabby-images/d0fac/d0fac097cf294e54149e25ee70666198f5e5ce42" alt=""
发送:
package boot.spring.controller.easy;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 1.简单模式:一个生产者一个消费者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TestSend {
public final static String QUEUE_NAME = "test-queue";
//创建队列,发送消息
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息内容
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息:"+message);
//关闭连接和通道
channel.close();
connection.close();
}
}
data:image/s3,"s3://crabby-images/19820/19820136d524c255d312bf9c8c48930f04cc39e6" alt=""
生产的一条消息未被消费:
data:image/s3,"s3://crabby-images/a24e4/a24e4f27bb489eb42ab53ea732d7e077dd112b12" alt=""
接收:
package boot.spring.controller.easy;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 简单模式一个生产者一个消费者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TestResive {
//消费者消费消息
public static void main(String[] args) throws Exception {
//获取连接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明通道
channel.queueDeclare(TestSend.QUEUE_NAME,false,false,false,null);
//定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(TestSend.QUEUE_NAME,true,consumer);
while(true){
//这个方法会阻塞住,直到获取到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到消息:"+message);
}
}
}
data:image/s3,"s3://crabby-images/51705/51705f3eb642688754254c7bad2f84229136ec8f" alt=""
已被消费:
data:image/s3,"s3://crabby-images/efc90/efc9076a5877c408a2f8016e3644d75ef0ca648f" alt=""
2.2 work模式
竞争消费者模式
一个生产者,多个消费者,每个消费者获取到的消息唯一,生产的消息会被消费者瓜分。
原理图:data:image/s3,"s3://crabby-images/a6fdf/a6fdfea7e4a5fc7bf807e3f27051d81a326243eb" alt=""
生产100条消息:
package boot.spring.controller.work;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 2.work模式:一个生产者多个消费者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class WorkSend2 {
public final static String QUEUE_NAME = "test2";
//消息生产者
public static void main(String[] args) throws Exception {
//获取连接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "";
for(int i = 0; i<100; i++){
message = "" + i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息:"+message);
Thread.sleep(i);
}
channel.close();
connection.close();
}
}
data:image/s3,"s3://crabby-images/556c3/556c32c802d6005f0b601ebc3b24c33ea5c0f169" alt=""
消费者1:
package boot.spring.controller.work;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 2.work模式:一个生产者多个消费者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class WorkResive1 {
//消费者1 自动模式
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(WorkSend2.QUEUE_NAME,false,false,false,null);
//同一时刻服务器只发送一条消息给消费端
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(WorkSend2.QUEUE_NAME,false,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(100);
//消息消费完给服务器返回确认状态,表示该消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
消费者1,消费了100条消息中的一半:
data:image/s3,"s3://crabby-images/fd552/fd5526f63f314c928d715e4cb4790f60c77d14d0" alt=""
消费者2:
package boot.spring.controller.work;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 2.work模式:一个生产者多个消费者
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class WorkResive2 {
//消费者2 手动模式
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test2",false,false,false,null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("test2",true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(10);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
消费者2消费了100条消息的另一半:
data:image/s3,"s3://crabby-images/1b8b8/1b8b8c1391547d5dda928328a0f5bd09b48472d4" alt=""
2.3 订阅模式
生产者将消息发送到交换机,消费者从交换机获取消息。
原理图:data:image/s3,"s3://crabby-images/5041f/5041f48d007f2778939741dbcd9f274689f89432" alt=""
生产者发送消息到交换机:
package boot.spring.controller.exchange;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 3.订阅者模式:一个生产者发送的消息会被多个消费者获取
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
//生产者,发送消息到交换机
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String message = "订阅模式:消息007!";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
生产者产生的消息:
data:image/s3,"s3://crabby-images/a85fa/a85fa85e3bf2212aacd981d608fede66e821adf6" alt=""
消费者1:
package boot.spring.controller.exchange;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 3.订阅者模式:一个生产者发送的消息会被多个消费者获取
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class Resive1 {
//消费者1
public final static String QUEUE_NAME = "test_queue_exchange_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上
channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者1:"+message);
}
}
}
data:image/s3,"s3://crabby-images/0e607/0e607d621b678a2cf2c159d03274ddc3e8811290" alt=""
消费者2:
package boot.spring.controller.exchange;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 3.订阅者模式:一个生产者发送的消息会被多个消费者获取
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class Resive2 {
//消费者2
public final static String QUEUE_NAME = "test_queue_exchange_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上
channel.queueBind(QUEUE_NAME,Send.EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者2:"+message);
}
}
}
data:image/s3,"s3://crabby-images/27d85/27d851e58c55757af47ad71b95c693c7d594b165" alt=""
由此可见,订阅者模式中,所有的消费者都通过交换机收到了消息。
2.4 路由模式
生产者发送消息到队列中时可自定义一个key,消费者可根据key去选择对应的消息,各取所需。
注意:路由key,是一种完全匹配,只有匹配到的消费者才能消费消息。
data:image/s3,"s3://crabby-images/92b2e/92b2ed0daa367a9da7b135775b1e6ccb35e9c8a2" alt=""
生产者生产带key的消息:(key=“dog”)
package boot.spring.controller.rout;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 4.路由模式:发送消息到交换机并且要指定路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class RoutSend {
public static final String EXCHANGE_NAME = "test_exchange_direct";
//生产者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String message = "路由模式产生的消息!";
channel.basicPublish(EXCHANGE_NAME,"dog",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
data:image/s3,"s3://crabby-images/05d73/05d735172b2fda8a0bf49e39e4feeac1ece715fe" alt=""
消费者1:(key=“dog”)
package boot.spring.controller.rout;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 4.路由模式:消费者将队列绑定到交换机时需要指定路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class RoutResive1 {
//消费者1
public final static String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键为"dog"
channel.queueBind(QUEUE_NAME, RoutSend.EXCHANGE_NAME,"dog");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("RoutResive1:"+message);
}
}
}
data:image/s3,"s3://crabby-images/ed674/ed6747df2dcc73da7b4bbe4a07d1ef74a4bcbdf1" alt=""
消费者2:(key=“cat”)
package boot.spring.controller.rout;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 4.路由模式:消费者将队列绑定到交换机时需要指定路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class RoutResive2 {
//消费者2
public final static String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键为"cat"
channel.queueBind(QUEUE_NAME, RoutSend.EXCHANGE_NAME,"cat");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("RoutResive2:"+message);
}
}
}
data:image/s3,"s3://crabby-images/c6487/c648761b910f879c95afadf19ee79f51903ab83e" alt=""
很显然,消费者1获取到了消息,消费者2并没有获取到消息,因为消费者2的key与生产者的key不一致。
2.5 通配符模式
原理和路由模式类似,只是key值作了模糊匹配而已。
-
*(星号)可以正好代替一个词。
-
# (hash) 可以代替零个或多个单词
-
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配一个单词。如下图所示:
20180628164513643
data:image/s3,"s3://crabby-images/7e745/7e74575af49a627de1ab2d78e6f5855b9d34abb7" alt=""
生产者产生消息:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @description 5.路由模式:发送消息到交换机并且要指定通配符路由
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicSend {
//生产者
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 topic:交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String message = "通配符模式产生的消息";
channel.basicPublish(EXCHANGE_NAME,"dog.1",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
data:image/s3,"s3://crabby-images/4074c/4074cec861674cf3f4e59b9958c01da0ff24e77a" alt=""
消费者1:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 5.路由模式:消费者将队列绑定到交换机时需要指定通配符路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicResive1 {
//消费者1
public final static String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键匹配规则为"dog.*"
channel.queueBind(QUEUE_NAME, TopicSend.EXCHANGE_NAME,"dog.*");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("TopicResive1:"+message);
}
}
}
data:image/s3,"s3://crabby-images/28487/284871260d919007ad6612f5e3c7157c093173c9" alt=""
消费者2:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 5.路由模式:消费者将队列绑定到交换机时需要指定通配符路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicResive2 {
//消费者2
public final static String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键匹配规则为"#.1"
channel.queueBind(QUEUE_NAME, TopicSend.EXCHANGE_NAME,"#.1");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("TopicResive2:"+message);
}
}
}
data:image/s3,"s3://crabby-images/e66a5/e66a5f777841f4e6906df865bef4b8b8871d9c3b" alt=""
消费者3:
package boot.spring.controller.topic;
import boot.spring.controller.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @description 5.路由模式:消费者将队列绑定到交换机时需要指定通配符路由key
* @AUTHER: sk
* @DATE: 2021/9/26
**/
public class TopicResive3 {
//消费者3
public final static String QUEUE_NAME = "test_queue_topic_3";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键匹配规则为"cat.#"
channel.queueBind(QUEUE_NAME, TopicSend.EXCHANGE_NAME,"cat.#");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("TopicResive3:"+message);
}
}
}
data:image/s3,"s3://crabby-images/ed047/ed047c527d1a967ca54742d2075afae3150febb8" alt=""
结果:消费者1和消费者2可以收到消息,消费者3不能收到消息。
完整代码地址:
https://github.com/DongFangXiaoYu/springBoot-RabbitMq/tree/master
网友评论