这里主要说的是异步消息,异步消息时一个应用程序向另一个应用程序间接发送消息的一种方式,这种方式无需等待对方的响应。
JMS
JMS(Java Message Service)是一个Java标准,定义了使用消息代理的通用API。借助JMS,所有遵从规范的实现都使用通用的接口,这就类似于JDBC为数据库操作提供了通用的接口一样。
JMS支持点对点消息模型和发布-订阅模型。
点对点消息模型
在点对点消息模型中,消息发送者将消息发送到队列中,接收者从队列中取出消息。
点对点消息模型消息队列对消息发送者和消息接收者进行了解耦。虽然队列可以有多个接收者,但是每一条消息只能被一个接收者取走。
发布-订阅消息模型
在发布-订阅消息模型中,消息会发送给一个主题,多个接收者都可以监听一个主题。与队列不同,该主题的订阅者都会接受到此消息的副本。
发布-订阅消息模型ActiveMQ就是遵从JMS规范的框架。
AMQP
AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
相比JMS,AMQP的优势在于它具有更加灵活和透明的消息模型。除了JMS支持的消息模型,AMQP还能够让我们以其他的多种方式来发送消息,这是通过将消息的生产者与存放消息的队列解耦实现的。
AMQP消息模型
AMQP的生产者不会直接将消息发布到队列中。AMQP在消息的生产者以及传递消息的队列之间引入了一种间接的机制:Exchange。
AMQP消息模型消息的生产者将信息发布到一个Exchange。Exchange会绑定到一个或多个队列上,它负责将信息路由到队列上。消息的消费者会从队列中提取数据并进行处理。
AMQP定义了四中不同类型的Exchange,每一种都有不同的路由算法,这些算法决定了是否要将信息放到队列中:
- Direct:如果消息的routing key 与binding的routing key 直接匹配的话,消息将会路由到该队列上;
- Topic: 如果消息的routing key 与binding的routing key 符合通配符匹配的话,消息将会路由到该队列上;
- Headers: 如果消息参数表中的头信息和值与binding参数表中相匹配,消息将会路由到该队列上;
- Fanout: 不管消息的routing key 和参数表的头信息/值是什么,消息将会路由到所有队列上。
RabbitMQ
RabbitMQ是一个流行的开源消息代理,它实现了AMQP。Spring AMQP为RabbitMQ提供了支持,包括RabbitMQ链接工厂,模板以及Spring配置命名空间。
安装&启动服务
首先我们需要安装RabbitMQ并启动服务。
Download
安装的话可以搜索一下教程,网上很多。安装好之后,配置环境,然后我们通过命令行输入rabbitmq-server
启动服务。出现如下界面表示服务已启动:
## ##
## ## RabbitMQ 3.7.7. Copyright (C) 2007-2018 Pivotal Software, Inc.
########## Licensed under the MPL. See http://www.rabbitmq.com/
###### ##
########## Logs: /usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro.log
/usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro_upgrade.log
Starting broker...
completed with 3 plugins.
Spring配置
首先是pom.xml中导入依赖:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
其次创建一个配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--导入配置文件-->
<context:property-placeholder location="classpath:rabbit.properties"/>
<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory"
host="${mq.host}"
username="${mq.username}"
password="${mq.password}"
port="${mq.port}"
virtual-host='/'/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--创建一个队列-->
<rabbit:queue id="spittleAlertQueue" name="spittle.alerts" />
<!--配置rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--队列监听器-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="spittleAlertQueue" ref="queueListener"/>
</rabbit:listener-container>
<!--注册相关bean-->
<bean id="alertService" class="com.hubert.rabbit.AlertServiceImpl">
<constructor-arg ref="rabbitTemplate"/>
</bean>
<bean id="queueListener" class="com.hubert.rabbit.QueueListener"/>
</beans>
rabbit.properties
mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
bean
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueListener implements MessageListener {
Logger logger = LoggerFactory.getLogger(QueueListener.class);
@Override
public void onMessage(Message msg) {
logger.info("receive");
try {
logger.info(msg.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class AlertServiceImpl {
private RabbitTemplate rabbit;
public AlertServiceImpl(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendSpittleAlert() {
rabbit.convertAndSend("spittle.alerts", "object");
}
}
启动
最后是在main方法中发送消息:
public class RabbitMain {
public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("rabbitmq-config.xml");
AlertServiceImpl service = (AlertServiceImpl) applicationContext.getBean("alertService");
service.sendSpittleAlert();
}
}
顺利的话我们就会看到处理消息的日志:
INFO com.hubert.rabbit.QueueListener - (Body:'object' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=spittle.alerts, receivedDelay=null, deliveryTag=12, messageCount=0, consumerTag=amq.ctag-rLNiWGT6LCAMD3li8xv1qA, consumerQueue=spittle.alerts])
这个过程并不是那么顺利,可能会出现链接错误,权限被拒的情况,一般都是RabbitMQ的用户权限或者Virtual Host的权限的问题。
网友评论