为什么使用消息中间件
消息中间件作用:解耦服务调用。松耦合。 使用中间件,不用等调用的服务处理完才返回结果。提高效率。
042.jpg消息中间件解决服务调用之间的耦合
043.jpg消息中间件带来的好处
- 解耦
- 异步
- 横向扩展
- 安全可靠
- 顺序保证
- 等等。。。
什么是中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
什么是消息中间件:关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统
什么是JMS:Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
044.png什么是AMQP:AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不接受客户端/中间件不同产品,不同开发语言等条件的限制。
常用消息中间件对比
-
ActiveMQ
- ActiveMQ是Apache出品的,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊地位。
- 多种语言和协议编写客户端,语言:Java、C、C++、C#、Ruby、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
- 完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
- 虚拟主题,组合目的,镜像队列
-
RabbitMQ
- RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
- 支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
- AMQP的完整实现(vhost、Exchange、Binding、Routing Key等)
- 事务支持/发布确认
- 消息持久化
-
Kafka
- Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
- 通过O(1)的磁盘结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
- Partition、Consumer Group
JMS相关概念
-
提供者:实现JMS规范的消息中间件服务器
-
客户端:发送或接收消息的应用程序
-
生产者/发布者:创建并发送消息的客户端
-
消费者和订阅者:接收并处理消息的客户端
-
消息:应用程序之间传递的数据内容
-
消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
-
队列模式
- 客户端包括生产者和消费者
- 队列中的消息只能被一个消息者消费
- 消费者可以随时消费队列中的消息
- 主体模型
- 客户端包括发布者和订阅者
- 主题中的消息被所有订阅者消费
- 消费者不能消费订阅之前就发送到主题中的消息
- JMS编码接口
- ConnectionFactory:用于创建连接到消息中间件的连接工厂
- Connection:代表了应用程序和消息服务器之间的通信链路
- Destination:指消息发布和接收的地点,包括队列和主题
- Session:表示一个单线程的上下文,用于发送和接收消息
- MessageConsumer:由会话创建,用于接收发送到目标的消息
- MessageProducer:由会话创建,用于发送消息到目标
- Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,一个消息体
win安装activemq
%activeMQ%\bin\win64:windows64位启动目录
-
activemq.bat:启动activemq
-
InstallService.bat:安装activemq服务到系统服务
-
启动完,访问localhost:8161
-
点击Manage ActiveMQ broker,用户名和密码:admin/admin
队列模式的消息演示
pom.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
AppProducer.java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppProducer {
private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL) ;
// 2、创建Connection
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 是否在事务中处理,应答模式
// 5、创建一个目标(队列)
Destination destination = session.createQueue(QUEUE_NAME);
// 6、创建生产者
MessageProducer producer = session.createProducer(destination);
for(int i = 0; i < 10; i++){
// 7、创建消息
TextMessage message = session.createTextMessage("create message " + i);
// 8、发布消息
producer.send(message);
System.out.println("消息已发送 :" + message.getText());
}
// 9、关闭连接
connection.close();
}
}
AppConsumer.java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Jas
* @create 2018-04-13 15:27
**/
public class AppConsumer {
private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws Exception {
// 1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL);
// 2、创建Connection
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 是否在事务中处理,应答模式
// 5、创建一个目标(队列)
Destination destination = session.createQueue(QUEUE_NAME);
// 6、创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);
// 7、创建一个监听器
/* Lambda表达式
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage message1 = (TextMessage) message;
try {
System.out.println("消费者接收到消息:" + message1.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
*/
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage message1 = (TextMessage) message;
try {
System.out.println("消费者接收到消息:" + message1.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8、关闭连接
// connection.close(); 消息接收是异步的过程,所以关闭连接则接收不到消息
}
}
启动两次AppConsumer监听消息发布
启动AppProducer发布消息,两个AppConsumer监听接收到的消息分别为:
消费者接收到消息:create message 0
消费者接收到消息:create message 2
消费者接收到消息:create message 4
消费者接收到消息:create message 6
消费者接收到消息:create message 8
消费者接收到消息:create message 1
消费者接收到消息:create message 3
消费者接收到消息:create message 5
消费者接收到消息:create message 7
消费者接收到消息:create message 9
主题模式的消息演示
AppProducer.java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppProducer {
private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
private static final String TOCPI_NAME = "topic-test";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Destination destination = session.createTopic(TOCPI_NAME);
MessageProducer producer = session.createProducer(destination);
for(int i = 0; i < 10; i++){
TextMessage message = session.createTextMessage("create message " + i);
producer.send(message);
System.out.println("消息已发送 :" + message.getText());
}
connection.close();
}
}
AppConsumer.java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppConsumer {
private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
private static final String TOCPI_NAME = "topic-test";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(TOCPI_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage message1 = (TextMessage) message;
try {
System.out.println("消费者接收到消息:" + message1.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
运行AppProducer后,再运行AppConsumer,监听不到消息发布;两个AppConsumer监听,会全部接收到AppProducer发布的消息。
使用Spring集成JMS连接ActiveMQ
- ConnectionFactory:用于管理连接的连接工厂
- 一个Spring为我们提供的连接池
- JmsTemplate每次发消息都会重新创建连接,会话和productor
- spring提供了SingleConnectionFactory和CachingConnectionFactory
- JmsTemplate:用于发送和接收消息的模版类
- 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms
- JmsTemplate类是线程安全的,可以在整个应用范围使用
- MessageListerner:消息监听器
- 实现一个onMessage方法,该方法只接收一个Message参数
pom.xml
<properties>
<spring-version>4.3.9.RELEASE</spring-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<!-- 排除 ActiveMQ 自身依赖的 Spring -->
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
ProducerService.java
public interface ProducerService {
/**
* 生产者发送消息
* @param message
*/
void sendMessage(String message);
}
common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<!-- ActiveMQ 提供的ConnectionFactory -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- 配置 brokerURL,这里为你自己开启 ActiveMQ 服务的地址-->
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!-- Spring jms为我们 提供的连接池 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
</bean>
<!--
点对点或队列模型
配置队列的目的地
-->
<bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="spring-jms-queue"/>
</bean>
<!--
发布者/订阅者模型
配置主题的目的地
-->
<bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg name="name" value="spring-jms-topic"/>
</bean>
</beans>
producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 导入公共配置 -->
<import resource="common.xml"/>
<!-- 配置 JmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 把 ProducerServiceImpl 交给Spring IoC 容器管理-->
<bean class="com.jas.jms.producer.ProducerServiceImpl"/>
</beans>
ProducerServiceImpl.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
public class ProducerServiceImpl implements ProducerService {
@Autowired
JmsTemplate jmsTemplate;
/**
* 这里以 @Resource 方式注入目的地对象
* 如果是发布者/订阅者模式,只选要修改 name 中的值为“activeMQTopic”即可
*/
@Resource(name = "activeMQQueue")
Destination destination;
@Override
public void sendMessage(final String message) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println("消息已发送:" + message);
}
}
Producer.java
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Producer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService producerService = context.getBean(ProducerService.class);
for (int i = 0; i < 10; i++) {
producerService.sendMessage("test message:" + i);
}
// 关闭 IoC 容器
context.close();
}
}
consumer.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 导入公共配置 -->
<import resource="common.xml"/>
<!-- 配置自定义消费者消息监听器 -->
<bean id="consumerMessageListener" class="com.jas.jms.consumer.ConsumerMessageListener"/>
<!-- 配置消息监听器的容器 -->
<bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="activeMQQueue"/>
<!--
配置发布者/订阅者模型的目的地
<property name="destination" ref="activeMQTopic"/>
-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
ConsumerMessageListener.java
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收已接收:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Consumer.java
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Consumer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}
ActiveMQ集群配置
-
为什么要对消息中间件集群?
- 实现高可用,以排除单点故障引起的服务中断
- 实现负载均衡,以提升效率为更多客户提供服务
-
集群方式
- 客户端集群:让多个消费者消费同一队列
- Broker clusters:多个Broker之间同步消息
- Master Slave:实现高可用
-
ActiveMQ失效转义(failover)
- 允许当中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器。
- 语法:failover:(uri,...,uriN)?transportOptions
-
transportPotions参数说明
- randomize默认为true,表示在URI列表中选择URI连接时是否采用随机策略
- initialReconnectDelay默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
- maxReconnectDelay默认30000,单位毫秒,最长重连的时间间隔
网友评论