美文网首页
Java消息中间件

Java消息中间件

作者: Hey_Shaw | 来源:发表于2018-05-07 23:17 被阅读311次

为什么使用消息中间件

消息中间件作用:解耦服务调用。松耦合。 使用中间件,不用等调用的服务处理完才返回结果。提高效率。

042.jpg

消息中间件解决服务调用之间的耦合

043.jpg

消息中间件带来的好处

  • 解耦
  • 异步
  • 横向扩展
  • 安全可靠
  • 顺序保证
  • 等等。。。

什么是中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。

什么是消息中间件:关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统

什么是JMS:Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

什么是AMQP:AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不接受客户端/中间件不同产品,不同开发语言等条件的限制。

044.png
常用消息中间件对比
  • ActiveMQ

    • ActiveMQ是Apache出品的,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊地位。
    • 多种语言和协议编写客户端,语言:Java、C、C++、C#、Ruby、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
    • 完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
    • 虚拟主题,组合目的,镜像队列
  • RabbitMQ

    • RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
    • 支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
    • AMQP的完整实现(vhost、Exchange、Binding、Routing Key等)
    • 事务支持/发布确认
    • 消息持久化
  • Kafka

    • Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
    • 通过O(1)的磁盘结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能
    • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
    • Partition、Consumer Group
045.png
JMS相关概念
  • 提供者:实现JMS规范的消息中间件服务器

  • 客户端:发送或接收消息的应用程序

  • 生产者/发布者:创建并发送消息的客户端

  • 消费者和订阅者:接收并处理消息的客户端

  • 消息:应用程序之间传递的数据内容

  • 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

  • 队列模式

    • 客户端包括生产者和消费者
    • 队列中的消息只能被一个消息者消费
    • 消费者可以随时消费队列中的消息
046.png
  • 主体模型
    • 客户端包括发布者和订阅者
    • 主题中的消息被所有订阅者消费
    • 消费者不能消费订阅之前就发送到主题中的消息
047.png
  • JMS编码接口
    • ConnectionFactory:用于创建连接到消息中间件的连接工厂
    • Connection:代表了应用程序和消息服务器之间的通信链路
    • Destination:指消息发布和接收的地点,包括队列和主题
    • Session:表示一个单线程的上下文,用于发送和接收消息
    • MessageConsumer:由会话创建,用于接收发送到目标的消息
    • MessageProducer:由会话创建,用于发送消息到目标
    • Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,一个消息体
048.png
win安装activemq

%activeMQ%\bin\win64:windows64位启动目录

  • activemq.bat:启动activemq

  • InstallService.bat:安装activemq服务到系统服务

  • 启动完,访问localhost:8161

  • 点击Manage ActiveMQ broker,用户名和密码:admin/admin

队列模式的消息演示

pom.xml

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>

AppProducer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppProducer {

    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws Exception {
        // 1、创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL) ;

        // 2、创建Connection
        Connection connection = connectionFactory.createConnection();

        // 3、启动连接
        connection.start();

        // 4、创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  // 是否在事务中处理,应答模式

        // 5、创建一个目标(队列)
        Destination destination = session.createQueue(QUEUE_NAME);
        // 6、创建生产者
        MessageProducer producer = session.createProducer(destination);


        for(int i = 0; i < 10; i++){
            // 7、创建消息
            TextMessage message = session.createTextMessage("create message " + i);
            // 8、发布消息
            producer.send(message);
            System.out.println("消息已发送 :" + message.getText());
        }
        // 9、关闭连接
        connection.close();
    }

}

AppConsumer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Jas
 * @create 2018-04-13 15:27
 **/
public class AppConsumer {

    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws Exception {

        // 1、创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL);

        // 2、创建Connection
        Connection connection = connectionFactory.createConnection();

        // 3、启动连接
        connection.start();

        // 4、创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  // 是否在事务中处理,应答模式

        // 5、创建一个目标(队列)
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6、创建一个消费者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7、创建一个监听器
        /*  Lambda表达式
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" +  message1.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        */
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" + message1.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 8、关闭连接
        // connection.close();  消息接收是异步的过程,所以关闭连接则接收不到消息
    }
}

启动两次AppConsumer监听消息发布

启动AppProducer发布消息,两个AppConsumer监听接收到的消息分别为:

消费者接收到消息:create message 0
消费者接收到消息:create message 2
消费者接收到消息:create message 4
消费者接收到消息:create message 6
消费者接收到消息:create message 8
消费者接收到消息:create message 1
消费者接收到消息:create message 3
消费者接收到消息:create message 5
消费者接收到消息:create message 7
消费者接收到消息:create message 9
主题模式的消息演示

AppProducer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppProducer {
    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String TOCPI_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建主题
        Destination destination = session.createTopic(TOCPI_NAME);

        MessageProducer producer = session.createProducer(destination);
        for(int i = 0; i < 10; i++){
            TextMessage message = session.createTextMessage("create message " + i);
            producer.send(message);
            System.out.println("消息已发送 :" + message.getText());
        }

        connection.close();
    }
}

AppConsumer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppConsumer {

    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String TOCPI_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(TOCPI_NAME);

        MessageConsumer consumer = session.createConsumer(destination);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" +  message1.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        
    }
}

运行AppProducer后,再运行AppConsumer,监听不到消息发布;两个AppConsumer监听,会全部接收到AppProducer发布的消息。

使用Spring集成JMS连接ActiveMQ

  • ConnectionFactory:用于管理连接的连接工厂
    • 一个Spring为我们提供的连接池
    • JmsTemplate每次发消息都会重新创建连接,会话和productor
    • spring提供了SingleConnectionFactory和CachingConnectionFactory
  • JmsTemplate:用于发送和接收消息的模版类
    • 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms
    • JmsTemplate类是线程安全的,可以在整个应用范围使用
  • MessageListerner:消息监听器
    • 实现一个onMessage方法,该方法只接收一个Message参数

pom.xml

<properties>
    <spring-version>4.3.9.RELEASE</spring-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring-version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
        <!-- 排除 ActiveMQ 自身依赖的 Spring -->
        <exclusions>
            <exclusion>
                <artifactId>spring-context</artifactId>
                <groupId>org.springframework</groupId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

ProducerService.java

public interface ProducerService {
    /**
     * 生产者发送消息
     * @param message
     */
    void sendMessage(String message);
}

common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd 
       http://www.springframework.org/schema/context 
       http://www.springframework.org/schema/context/spring-context.xsd">
    
    <context:annotation-config/>

    <!-- ActiveMQ 提供的ConnectionFactory -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!-- 配置 brokerURL,这里为你自己开启 ActiveMQ 服务的地址-->
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>

    <!-- Spring jms为我们 提供的连接池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
    </bean>

    <!-- 
        点对点或队列模型
        配置队列的目的地 
    -->
    <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-jms-queue"/>
    </bean>

    <!-- 
        发布者/订阅者模型
        配置主题的目的地 
    -->
    <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="spring-jms-topic"/>
    </bean>
</beans>

producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 导入公共配置 -->
    <import resource="common.xml"/>
    
    <!-- 配置 JmsTemplate -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <!-- 把 ProducerServiceImpl 交给Spring IoC 容器管理-->
    <bean class="com.jas.jms.producer.ProducerServiceImpl"/>
</beans>

ProducerServiceImpl.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.annotation.Resource;
import javax.jms.*;

public class ProducerServiceImpl implements ProducerService {

    @Autowired
    JmsTemplate jmsTemplate;
    /**
     * 这里以 @Resource 方式注入目的地对象
     * 如果是发布者/订阅者模式,只选要修改 name 中的值为“activeMQTopic”即可
     */
    @Resource(name = "activeMQQueue")
    Destination destination;

    @Override
    public void sendMessage(final String message) {
       jmsTemplate.send(destination, new MessageCreator() {
           @Override
           public Message createMessage(Session session) throws JMSException {
               TextMessage textMessage = session.createTextMessage(message);
               return textMessage;
           }
       });
        System.out.println("消息已发送:" + message);
    }
}

Producer.java

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Producer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerService producerService = context.getBean(ProducerService.class);

        for (int i = 0; i < 10; i++) {
            producerService.sendMessage("test message:" + i);
        }
        
        // 关闭 IoC 容器
        context.close();
    }
}

consumer.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 导入公共配置 -->
    <import resource="common.xml"/>

    <!-- 配置自定义消费者消息监听器 -->
    <bean id="consumerMessageListener" class="com.jas.jms.consumer.ConsumerMessageListener"/>
    
    <!-- 配置消息监听器的容器 -->
    <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="activeMQQueue"/>
        <!--
            配置发布者/订阅者模型的目的地
            <property name="destination" ref="activeMQTopic"/>
         -->
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>
</beans>

ConsumerMessageListener.java

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;

        try {
            System.out.println("接收已接收:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
}

ActiveMQ集群配置

  • 为什么要对消息中间件集群?

    • 实现高可用,以排除单点故障引起的服务中断
    • 实现负载均衡,以提升效率为更多客户提供服务
  • 集群方式

    • 客户端集群:让多个消费者消费同一队列
    • Broker clusters:多个Broker之间同步消息
    • Master Slave:实现高可用
  • ActiveMQ失效转义(failover)

    • 允许当中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器。
    • 语法:failover:(uri,...,uriN)?transportOptions
  • transportPotions参数说明

    • randomize默认为true,表示在URI列表中选择URI连接时是否采用随机策略
    • initialReconnectDelay默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
    • maxReconnectDelay默认30000,单位毫秒,最长重连的时间间隔

相关文章

  • Java消息中间件之RabbitMQ

    什么是消息中间件? 消息代理规范JMS(Java Message Service)JAVA消息服务:基于JVM消息...

  • 消息中间件概念

    什么是JMS Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口...

  • JMS

    慕课网Java消息中间件笔记 JMS定义 Java消息服务(Java Message Server)即JMS,是一...

  • ActiveMQ

    1.Java消息中间件学习笔记一 -- 什么是消息中间件? https://blog.csdn.net/winte...

  • JAVA名词汇

    1、Java中间件,分布式系统、分布式缓存、消息队列 JAVA中间件:包括服务框架中间件:解决集群间的访问通信问题...

  • 腾讯内容首发:分布式核心原理解析笔记+分布式消息中间件实践笔记P

    分布式消息中间件实践笔记 首先,这份分布式消息中间件实践笔记是以Java语言编写。 消息中间件是分布式系统中的重要...

  • 消息中间件学习之JMS(1)

    说到消息中间件,不得不提一下 JMS。JMS是java消息服务(Java Message Service)应...

  • JMS 使用说明(安装部署)

    JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统...

  • ActiveMQ(一) 简介

    JMS和ActiveMQ JMS(Java Messaging Service)是Java平台上有关面向消息中间件...

  • 消息中间件-ActiveMQ详解

    1、消息中间件之JMS规范 什么是Java消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API,它...

网友评论

      本文标题:Java消息中间件

      本文链接:https://www.haomeiwen.com/subject/udfilftx.html