消息中间件的定义:
没有标准定义,一般认为,采用消息传送机制/****消息队列** **的中间件技术,进行数据交流,用在分布式系统的集成
为什么要用消息中间件?
解决分布式系统之间消息的传递。
电商场景:用户下单减库存,调用物流系统,系统扩充后服务化和业务拆分。系统交互,y一般用RPC(远程过程调用)。如果系统扩充到有几十个接口,消息中间件来解决问题。
和RPC有何区别?
使用区分标准:1、系统之间的依赖程度 2、量(业务量,数据量,访问量)
消息中间件有些什么使用场景?
1、 异步处理
用户注册(50ms),还需发送邮件(50ms)和短信(50ms)
串行:(150ms)用户注册—》发送邮件----》发送短信
并行(100ms):用户注册—》发送邮件
|----》发送短信
消息中间件(56ms):
用户注册(50ms)—》(6ms)消息中间件《-----发送邮件
《-----发送短信
2、 应用的解耦
订单系统---》库存系统(强耦合)
消息中间件:订单系统---》消息中间件《----库存系统(解耦)
3、 流量的削峰
用户请求-----》秒杀应用
应用的前端加入消息队列
用户请求-----》消息队列《----秒杀应用
4、 日志处理
错误日志---》消息队列《----日志处理
用户行为日志--》消息队列(kafka)《-----日志的存储或流式处理
5、纯粹的消息通信
常见消息中间件比较
1.jpgkafka****和RabbitMQ****的比较
1、 RabbitMq比kafka成熟,在可用性上,稳定性上,可靠性上,RabbitMq超过kafka
2、 Kafka设计的初衷就是处理日志的,可以看做是一个日志系统,针对性很强,所以它并没有具备一个成熟MQ应该具备的特性
3、 Kafka的性能(吞吐量、tps)比RabbitMq要强
什么是JMS规范
本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS对象模型包含如下几个要素:
1)连接工厂:创建一个JMs连接
2)JMS连接:客户端和服务器之间的一个连接。
3)JMS会话:客户和服务器会话的状态,建立在连接之上的
4)JMS目的:消息队列
5)JMS生产者:消息的生成
6)JMS消费者:接收消息
7)Broker:消息中间件的实例(ActiveMq)
JMS规范中的点对点模式:
队列,一个消息只有一个消费者(即使有多个接受者监听队列),消费者是要向队列应答成功
JMS规范中的主题模式(发布订阅):
发布到Topic的消息会被当前主题所有的订阅者消费
JMS规范中的消息类型
TextMessage,MapMessage,ObjectMessage,BytesMessage,StreamMessage
代码:不配置Spring
pom.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
Producer
public class JmsProducer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer messageProducer;
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
connection = connectionFactory.createConnection();
connection.start();
/*
createSession参数取值
* 1、为true表示启用事务
* 2、消息的确认模式
* AUTO_ACKNOWLEDGE 自动签收
* CLIENT_ACKNOWLEDGE 客户端自行调用acknowledge方法签收
* DUPS_OK_ACKNOWLEDGE 不是必须签收,消费可能会重复发送
* 在第二次重新传送消息的时候,消息
头的JmsDelivered会被置为true标示当前消息已经传送过一次,
客户端需要进行消息的重复处理控制。
* */
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("HelloWAM");
messageProducer = session.createProducer(destination);
for(int i=0;i<SENDNUM;i++){
String msg = "发送消息"+i+" "+System.currentTimeMillis();
TextMessage message = session.createTextMessage(msg);
System.out.println("发送消息:"+msg);
messageProducer.send(message);
}
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
启动生产者后台发送10条消息到队列
2.png 3.jpg
Consumer
public class JmsConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);
try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接HelloWorld的消息队列
destination = session.createQueue("HelloWAM");
//创建消息消费者
messageConsumer = session.createConsumer(destination);
//读取消息
while(true){
TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
if(textMessage != null){
System.out.println("Accept msg : "+textMessage.getText());
}else{
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
启动消费者
4.jpg 5.jpg
集成Spring,生产者部分
pom.xml
<!--ActiveMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
Spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!--以上增加activemq的命名空间-->
<!-- 配置扫描路径 -->
<context:component-scan base-package="com.xxx">
<context:exclude-filter type="annotation"
expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100"></property>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 队列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 发布订阅模式-->
<property name="pubSubDomain" value="true"></property>
</bean>
<!--Spring JmsTemplate 的消息生产者 end-->
</beans>
QueueSender
@Component("queueSender")
public class QueueSender {
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
@Autowired
private GetResponse getResponse;
public void send(String queueName,final String message){
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
return msg;
}
});
}
}
TopicSender
@Component("topicSender")
public class TopicSender {
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;
public void send(String queueName,final String message){
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
return msg;
}
});
}
}
消费者部分
配置文件
<!-- 配置扫描路径 -->
<context:component-scan base-package="com.dongnaoedu">
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息消费者 start-->
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
<jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
<jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消费者 end -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 队列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
</beans>
QueueReceiver1
@Component
public class QueueReceiver1 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("QueueReceiver1 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
QueueReceiver2
@Component
public class QueueReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("QueueReceiver2 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
TopicReceiver1
@Component
public class TopicReceiver1 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("TopicReceiver1 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
TopicReceiver2
@Component
public class TopicReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("TopicReceiver2 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
启动Producer,发送Queue消息,启动Consumer,可看到点对点模式,有两个消费者,一条消息,只有一个消费者能接收到消息,是轮询方式。
6.jpg
7.jpg
主题订阅模式,可看到有两个消费者,一条消息,两个消费者都能接收到消息。
8.jpg
9.jpg
网友评论