大型系统的演变必然的发展方向是分布式,而在分布式系统中应用与应用之间互相连接越来越紧密,在应用之间的消息传递就像家常便饭般普遍。使用Java消息中间件处理异步消息成为了分布式系统中的必修课。
1.为什么要使用消息中间件
我们模拟这样一个场景:小明所在公司发展壮大,他们开发的商城系统新增了需求,需要在用户登录成功之后,给用户发个短信,告诉用户登录成功。ok,这个需求很合理,于是小明在登录接口中新增了发送短信的代码。没过几天,产品经理又找到小明,告诉他,用户登录成功之后还要给用户账号增加积分,积分换礼品来保证用户粘度。ok,这个需求很合理,然后小明又改造了登录接口,增加了登录成功修改积分的功能。很不幸,没过几天,产品经理又找到小明,咱们这个系统需要在用户登录成功之后将记录用户登录时间以便于当用户超过三天未登录时发送短信通知(骚扰用户?哈哈)。ok!,虽然这个需求还是很合理,于是小明又很傻很天真的将登录接口改造。小明正沾沾自喜之时,测试小姐姐找上门来了,表示登录所耗时间长,用户体验不行(这不废话么,一个登录接口中包含了那么多功能,不慢才怪呢)。这下小明傻眼了,该怎么办呢?
在上面模拟的场景中,我们可以发现登录接口中包含了太多的功能,并且都是同步的。这明显违反了程序设计中的单一设计原则,以及开闭原则。那我们应该如何解决小明的难题呢?没错,用异步处理。此处我们引入消息中间件。
2.什么是消息中间件
简单来说,利用高效可靠的异步消息传递机制来实现消息的发送和接收。
3. 消息中间件应用场景
还是上面那个示例,当我们使用消息中间件进行改造之后,我们可以用如下草图进行示意: image.png这样登录接口就变得很纯粹,仅仅实现登录功能,其他的功能采用消息中间件异步处理了。
我们从本例中至少能发现使用消息中间件的一个好处:解耦。这对系统的维护是有莫大的好处的。
4.关于消息中间件的两个规范
网上总结的很好,此处粘贴进行记录:
1.JMS
activeMQ(支持多语言,实现jms1.1,j2ee1.4规范),RabbitMQ(支持更多语言,基于AMQP规范),kafka(高吞吐量,分布式,分区,O(1)磁盘顺序提供消息持久化) image.png
java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的api,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
2.AMQP
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制
常见消息中间件
JMS中的两种消息模式对比
在jms规范中有队列模式和主题模式。
这两种模式区别在于,队列模式只能被消费者消费一次,即点对点模式,每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
主题模式也被称为发布订阅模式,每个消息都可以有多个消费者,我们生活中最常见的微信公众号其实就是一个最常见的主题模式有木有。微信公众号发布一段消息,订阅了该公众号的用户就可以看到该消息。
JMS代码示例
消息生产者类
public class AppQueueProducer {
public static final String url = "tcp://localhost:61616";
public static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
//1. 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//3. 启动连接
connection.start();
//4. 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 创建目标,为队列模式
Destination destination = session.createQueue(queueName);//队列模式
// Destination destination = session.createTopic(toppicName);//主题模式
//6. 创建一个生产者
MessageProducer producer = session.createProducer(destination);
//7. 发送消息
Stream.iterate(1, x -> x + 1).limit(100).map(y -> {
try {
//8. 创建消息
TextMessage textMessage = session.createTextMessage("test" + y);
//9. 发布消息
producer.send(textMessage);
return "发送消息:" + textMessage.getText();
} catch (JMSException e) {
e.printStackTrace();
return e.getMessage();
}
}).forEach(System.out::println);
//关闭连接
connection.close();
}
}
消息消费者类
public class AppQueueConsumer {
public static final String url="tcp://localhost:61616";
public static final String queueName="queue-test";
public static void main(String[] args) throws JMSException {
//1. 创建工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//3. 连接启动
connection.start();
//4. 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5. 创建目标
Destination destination=session.createQueue(queueName);//队列模式
//Destination destination=session.createTopic(toppicName);//主题模式
//6. 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//7. 创建消息监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage= (TextMessage) message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
我们启动队列模式下的消息生产者,然后启动三个消息消费者实例,我们会发现,在队列模式下,三个实例是均匀消费消息的,如下图
image.png
image.png
image.png
我们启动主题模式下的消息生产者,然后启动三个消息消费者实例,我们发现此时并没有接收到消息,原因在于消息消费者启动订阅之后,前面生产的消息已经发布了,此时他不能被后面的消息消费者发现。我们要先启动消息消费者,然后再启动消息生产者,我们发现三个消息消费者都完整接收到了消息发布者发布的100条消息。
消息中间件难的不是代码,而是思想。如何使用消息中间件让我们的系统更加健壮合理这才是我们需要真正思考的地方!
网友评论