介绍
RabbitMQ消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,AMQP,即Advanced Message Queuing Protocol,是应用层协议的一个开放标准,为面向消息的中间件设计。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间, 提高了系统的吞吐量。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
基本概念
- Broker - 简单来说就是消息队列服务器的实体。
- Exchange - 消息路由器,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
- Queue - 消息队列,用来存储消息,每个消息都会被投入到一个或多个队列。
- Binding - 绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
- RoutingKey - 路由关键字,Exchange 根据这个关键字进行消息投递。
- Producter - 消息生产者,产生消息的程序。
- Consumer - 消息消费者,接收消息的程序。
- Channel - 消息通道,在客户端的每个连接里可建立多个Channel,每个channel代表一个会话。
一般消息队列都是生产者将消息发送到队列,消费者监听队列进行消费。rabbitmq中一个虚拟主机(默认)持有一个或者多个交换机(Exchange)。 用户只能在虚拟主机的粒度进行权限控制,交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上, 这样生产者和队列就没有直接联系,而是将消息发送的交换机,交换机再把消息转发到对应绑定的队列上。
其中,交换机(Exchange)最常见的4种类型如下:
- Direct: 先匹配, 再投送。即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去. 交换机跟队列必须是精确的对应关系,这种最为简单。
- Topic: 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 这种可以认为是Direct 的灵活版
- Headers: 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers则是一个自定义匹配规则的类型, 在队列与交换器绑定时会设定一组键值对规则,消息中也包括一组键值对( headers属性),当这些键值对有一对或全部匹配时,消息被投送到对应队列
- Fanout : 消息广播模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routingkey会被忽略
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="vhostSJJ"
username="shijunjie" password="wssjj123" host="123.206.228.200" port="5672"
/>
<!-- <rabbit:connection-factory id="connectionFactory"
username="test2" password="test2"
host="123.206.228.200" port="5672" /> -->
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />
<!-- 消息接收者 -->
<bean id="messageReceiver" class="me.shijunjie.consumer.MessageConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>
<!--定义queue -->
<rabbit:queue name="queueChris" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean id="receiverChris" class="me.shijunjie.consumer.ChrisConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueChris" ref="receiverChris" />
</rabbit:listener-container>
<!-- 分隔线 -->
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory2" virtual-host="vhostSJJ"
username="shijunjie" password="wssjj123" host="123.206.228.200" port="5672" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2" />
<!--定义queue -->
<rabbit:queue name="queueShijj" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTest2"
durable="true" auto-delete="false" declared-by="connectAdmin2">
<rabbit:bindings>
<rabbit:binding queue="queueShijj" pattern="shijj.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
exchange="exchangeTest2" />
<!-- 消息接收者 -->
<bean id="recieverShijj" class="me.shijunjie.consumer.ShijjConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory2">
<rabbit:listener queues="queueShijj" ref="recieverShijj" />
</rabbit:listener-container>
</beans>
实践
生产者1
package me.shijunjie.producer;
import java.io.IOException;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
@Resource(name="amqpTemplate2")
private AmqpTemplate amqpTemplate2;
public void sendMessage(Object message) throws IOException {
logger.info("to send message:{}", message);
amqpTemplate.convertAndSend("queueTestKey", message);
amqpTemplate.convertAndSend("queueTestChris", message);
amqpTemplate2.convertAndSend("shijj.xxxx.wsdwd", message);
}
}
消费者1
package me.shijunjie.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("consumer receive message------->:{}", message);
}
}
消费者2
package me.shijunjie.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class ChrisConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ChrisConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("chris receive message------->:{}", message);
}
}
消费者3
package me.shijunjie.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class ShijjConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ShijjConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("shijj receive message------->:{}", message);
}
}
网友评论