有问题请联系我QQ:273206491
RabbitMQ是基于Erlang语言(俗称:二郎神)对AMQP协议的实现。
1、各个模块之间的一览图
image.png2、连接
这里以Java的客户端进行说明,客户端与RabbitMQ服务器之间是基于TCP连接的,而TCP连接的创建和销毁都非常耗费资源,因此RabbitMQ使用连接复用模式,也就是我们常用的Channel,一个TCP连接可以创建多个Channel,不同Channel之间是互相独立的,一个线程使用一个Channel是安全的,不会出现多线程共享同一个连接的问题(线程共享一个资源很容易出现线程安全的问题)。
2.1、连接创建示例
//连接工厂的初始化
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("rabbitmq服务器的地址");
connectionFactory.setUsername("用户名");
connectionFactory.setPassword("密码");
//通过连接工厂与rabbitmq之间建立一个tcp连接
Connection connection =connectionFactory.newConnection();
//通过连接创建信道,多个线程之间不要共享同一个信道
Channel channel = connection.createChannel();
2.2、资源释放问题
在连接不使用的时候及时关闭连接是非常重要的步骤,可能你在本地开发时不释放连接不会有什么问题,但一旦程序上线后,连接不释放很有很多导致RabbitMQ的连接因耗尽而无法接受新的连接请求或者其他什么问题。
释放连接的方式也非常简单,直接调用Connection对象的close方法可以释放一个连接,同时也会释放这个连接下的所有Channel资源。
3、生产者
用于生产消息,使用basicPublish向RabbitMQ发送消息
upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
- exchange 交换器的名称
- routingKey 路由key(当交换器的类型是fanout时,路由key是不生效的)
- props 用来设置消息的相关属性
///下面这两种方式是等价的
MessageProperties.PERSISTENT_TEXT_PLAIN
或者
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);
properties.contentType("text/plain");
properties.headers(null);//用于设置自己的header属性,交换器的类型中就有一种为headers(但不推荐使用),当我们设置的headers属性值和交换器绑定的值一致是就能够路由到响应队列中
- body 消息体
- mandatory
4、交换器
交换器可以认为是一个消息中转站,他通过和队列进行绑定,我们把消息发送到交换器中,交换器根据路由key再决定将消息投递给哪个队列(交换器的类型为fanout时,路由key是无效的)。
4.1、类型
- fanout 会把消息给绑定到这个交换器的所有队列都发生一遍
- direct 只将消息发生给路由匹配的队列
- topic 将消息发生获取路由匹配的对象,但是这里的匹配支持模糊匹配,rabbitmq的路由key使用点"."来分隔一个单词,"*"匹配一个单词,"#"匹配一个或者多个单词。举个例子:路由key:com.pingwazi.rabbitmq,绑定key1:com.#,绑定key2:com.*.*是匹配的。
- headers 发生消息的headers属性完全匹配是则认为匹配,这个模式不常用,并且性能也很差。
4.2、创建
//声明一个交换器,如果存在就不创建,如果存在的交换器参数与声明交换器参数不匹配就会报错,如果不存在就会创建
//这方式什么是同步的
upChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//这种方式声明式异步的,但是不推荐使用,因为可能你在调用了之后就去使用的时候RabbitMQ服务器还没有创建好。
upChannel.exchangeDeclareNoWait();
- exchange 交换器的名字
- type 交换器的类型
- durable 指定这个交换器是持久化的
- autoDelete 是否自动删除,true则表示自动删除,但是有一个前提条件,那就是曾经至少要有一个队列或者交换器与之绑定,但是现在都已经接触绑定了。这时RabbitMQ服务器才会自动的吧这个交换器给删除掉,也就是说如果这个交换器创建出来后,没有任何队列或者交换器与之绑定的话,RabbitMQ是不会自动删除的,即使我们设置了自动删除。
- internal 设置是否是内置的交换器,如果设置为true,则表示是内置的,那么客户端程序将无法直接向这个交换器发送消息,只能通过交换器路由到交换器的方式。
- argument 其他可选参数
4.3、判断交换器是否存在
//判断指定交换器是否存在,如果不存在就会报404异常。
upChannel.exchangeDeclarePassive();
4.4、删除交换器
// 同步删除
upChannel.exchangeDelete("",true);
//异步删除(不需要等待删除完成)
upChannel.exchangeDeleteNoWait("",true);
- exchange 交换器的名称
- ifUnused 是否只删除没有被使用的交换器
5、绑定路由key
//将交换器与队列进行绑定通过message进行绑定
upChannel.queueBind("msgQueue","msgExchange","message");
//将交换器与队列进行解绑
upChannel.queueUnbind("","","")
//将交换器与交换器绑定在一起
upChannel.exchangeBind( "","","")
//将交换器与交换器解绑
upChannel.exchangeUnbind( "","","")
绑定路由key是在绑定交换器和队列时指定的一个key,其中message就是masQueue队列与msgExchange交换器的绑定路由key。
路由key是发送消息的时候指定的key,交换器类型为direct或者topic时,路由key与绑定路由key后才会将消息发送到对应的队列中。
6、队列
队列是RabbitMQ实际存储消息的地方
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
upChannel.queueDeclare("msgQueue", true, false, false, null);
//异步声明一个队列,不推荐使用,原因同交换器一样
upChannel.queueDeclareNoWait();
- queue 队列的名称
- durable 设置队列是否为持久化队列,如果为true这表示是持久化队列
- exclusive 是否为排它队列,值为true则表示为排他队列,如果一个队列是排它队列,那么除了创建他的连接(注意:这里说的是连接,不是channel)能够使用之外,其他连接都不能使用,并且在创建它的连接断开时,这个队列会自动删除,即使设置为持久化队列也是如此。
- autoDelete 是否自动删除,值为true则表示自动删除,自动删除有一个前提条件,那就是曾经至少有一个消费者使用了这个队列,并且现在已经没有任何消费与这个队列建立了连接,这时候RabbitMQ才会自动删除这个队列。也就是说如果这个队列还没有任何消费与建立过连接,那么RabbitMQ是不会自动删除的。
- arguments 其他的一些参数设置
6.1、判断队列是否存在
//判断队列是否存在
upChannel.queueDeclarePassive();
6.2、删除队列
//同步删除
upChannel.queueDelete("",true,true);
//异步删除
upChannel.queueDeleteNoWait("",true,true);
- queue 队列名称
- ifUnused 是否只删除没有被使用过的队列
- ifEmpty 是否只删除空的队列
7、消费者
消息的消费者有两种常用的模式,即推模式和拉模式。两种模式的实现方式完全不同,推模式是只要队列中有消息了就会推送给消费者(当然了也要受未确认消息数的限制),而拉模式则是消费者需要的时候再去RabbitMQ中获取,而他一次也只能获取获取一条消息。
7.1、拉模式
image.png从图中可以看出,在单线程的情况下,消息的处理速度是比较慢的,当然了这里也可以使用多线程不断的从Rabbitmq中去获取,但这样就需要手动实现获取算法了。不废话,先上代码!
private void receiveGetMessage()
{
try
{
Channel downChannel=connection.createChannel();
//交换器类型:fanout、direct、topic
//声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
downChannel.queueDeclare("msgQueue", true, false, false, null);
//将交换器与队列进行绑定通过message进行绑定
downChannel.queueBind("msgQueue","msgExchange","message");
//消息未确认消息的数量
downChannel.basicQos(1);//在非自动确认的模式下,限制最多允许未确认的消息数量
boolean isBreak=false;
while (!isBreak)
{
//消费消息
GetResponse msgData = downChannel.basicGet("", false);
String msgBody=new String(msgData.getBody(), "utf-8");
System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消费者收到消息: " + msgBody);
//回复确认消息
downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
if(StringUtils.isEmpty(msgBody))
isBreak=true;
}
downChannel.close();
}
catch (ShutdownSignalException ex)
{
//连接异常关闭了,这里要进行检查,并尝试重新建立连接
ex.printStackTrace();
}
catch (IOException ex)
{
//发生io异常需要进行处理,对应channel可能关闭了
ex.printStackTrace();
} catch (TimeoutException e) {
//信道资源释放超时,可能对应的channel关闭了
e.printStackTrace();
}
}
以上代码我是通过但线程循环的方式从RabbitMQ中拉取代码,这种模式处理速度较慢,在不是用多线程进行处理的情况下,这中模式适合用于处理单个消息比较耗时的场景。
7.2、推模式
image.png通过运行如下代码可以看得出,推模式实际上是使用了多线程的在进行处理的。但是他的吞吐量是默认拉模式的好几倍,这中模式适合于处理每个消息的时间比较短的场景。
private void receivePushMessage()
{
try
{
Channel downChannel=connection.createChannel();
//交换器类型:fanout、direct、topic
//声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
downChannel.queueDeclare("msgQueue", true, false, false, null);
//将交换器与队列进行绑定通过message进行绑定
downChannel.queueBind("msgQueue","msgExchange","message");
//消息未确认消息的数量
downChannel.basicQos(10000);//在非自动确认的模式下,限制最多允许未确认的消息数量
//消费消息
downChannel.basicConsume("msgQueue",createConsumer(downChannel));
System.out.println("RabbitMQ消费者正在运行中...");
//不能释放信道资源!!!
//因为这里的消费者是用的推模式,如果关闭了信道,后面在进行消息消费的时候会报错
//downChannel.close();
}
catch (ShutdownSignalException ex)
{
//连接异常关闭了,这里要进行检查,并尝试重新建立连接
ex.printStackTrace();
}
catch (IOException ex)
{
//发生io异常需要进行处理,对应channel可能关闭了
ex.printStackTrace();
}
}
/**
* 创建消费对象
* @param channel
* @return
*/
private Consumer createConsumer(Channel channel)
{
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消费者收到消息: " + message);
// 消息确认
try {
channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
} catch (IOException e) {
//发生io异常需要进行处理,对应channel可能关闭了
e.printStackTrace();
}
}
};
return consumer;
}
网友评论