美文网首页
Java消息中间件

Java消息中间件

作者: Hey_Shaw | 来源:发表于2018-05-07 23:17 被阅读311次

    为什么使用消息中间件

    消息中间件作用:解耦服务调用。松耦合。 使用中间件,不用等调用的服务处理完才返回结果。提高效率。

    042.jpg

    消息中间件解决服务调用之间的耦合

    043.jpg

    消息中间件带来的好处

    • 解耦
    • 异步
    • 横向扩展
    • 安全可靠
    • 顺序保证
    • 等等。。。

    什么是中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。

    什么是消息中间件:关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统

    什么是JMS:Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

    什么是AMQP:AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不接受客户端/中间件不同产品,不同开发语言等条件的限制。

    044.png
    常用消息中间件对比
    • ActiveMQ

      • ActiveMQ是Apache出品的,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊地位。
      • 多种语言和协议编写客户端,语言:Java、C、C++、C#、Ruby、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
      • 完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
      • 虚拟主题,组合目的,镜像队列
    • RabbitMQ

      • RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
      • 支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
      • AMQP的完整实现(vhost、Exchange、Binding、Routing Key等)
      • 事务支持/发布确认
      • 消息持久化
    • Kafka

      • Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
      • 通过O(1)的磁盘结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能
      • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
      • Partition、Consumer Group
    045.png
    JMS相关概念
    • 提供者:实现JMS规范的消息中间件服务器

    • 客户端:发送或接收消息的应用程序

    • 生产者/发布者:创建并发送消息的客户端

    • 消费者和订阅者:接收并处理消息的客户端

    • 消息:应用程序之间传递的数据内容

    • 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

    • 队列模式

      • 客户端包括生产者和消费者
      • 队列中的消息只能被一个消息者消费
      • 消费者可以随时消费队列中的消息
    046.png
    • 主体模型
      • 客户端包括发布者和订阅者
      • 主题中的消息被所有订阅者消费
      • 消费者不能消费订阅之前就发送到主题中的消息
    047.png
    • JMS编码接口
      • ConnectionFactory:用于创建连接到消息中间件的连接工厂
      • Connection:代表了应用程序和消息服务器之间的通信链路
      • Destination:指消息发布和接收的地点,包括队列和主题
      • Session:表示一个单线程的上下文,用于发送和接收消息
      • MessageConsumer:由会话创建,用于接收发送到目标的消息
      • MessageProducer:由会话创建,用于发送消息到目标
      • Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,一个消息体
    048.png
    win安装activemq

    %activeMQ%\bin\win64:windows64位启动目录

    • activemq.bat:启动activemq

    • InstallService.bat:安装activemq服务到系统服务

    • 启动完,访问localhost:8161

    • 点击Manage ActiveMQ broker,用户名和密码:admin/admin

    队列模式的消息演示

    pom.xml

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
    

    AppProducer.java

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class AppProducer {
    
        private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
        private static final String QUEUE_NAME = "queue-test";
        
        public static void main(String[] args) throws Exception {
            // 1、创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL) ;
    
            // 2、创建Connection
            Connection connection = connectionFactory.createConnection();
    
            // 3、启动连接
            connection.start();
    
            // 4、创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  // 是否在事务中处理,应答模式
    
            // 5、创建一个目标(队列)
            Destination destination = session.createQueue(QUEUE_NAME);
            // 6、创建生产者
            MessageProducer producer = session.createProducer(destination);
    
    
            for(int i = 0; i < 10; i++){
                // 7、创建消息
                TextMessage message = session.createTextMessage("create message " + i);
                // 8、发布消息
                producer.send(message);
                System.out.println("消息已发送 :" + message.getText());
            }
            // 9、关闭连接
            connection.close();
        }
    
    }
    

    AppConsumer.java

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author Jas
     * @create 2018-04-13 15:27
     **/
    public class AppConsumer {
    
        private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
        private static final String QUEUE_NAME = "queue-test";
    
        public static void main(String[] args) throws Exception {
    
            // 1、创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL);
    
            // 2、创建Connection
            Connection connection = connectionFactory.createConnection();
    
            // 3、启动连接
            connection.start();
    
            // 4、创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  // 是否在事务中处理,应答模式
    
            // 5、创建一个目标(队列)
            Destination destination = session.createQueue(QUEUE_NAME);
    
            // 6、创建一个消费者
            MessageConsumer consumer = session.createConsumer(destination);
    
            // 7、创建一个监听器
            /*  Lambda表达式
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage message1 = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息:" +  message1.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            */
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage message1 = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息:" + message1.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            // 8、关闭连接
            // connection.close();  消息接收是异步的过程,所以关闭连接则接收不到消息
        }
    }
    

    启动两次AppConsumer监听消息发布

    启动AppProducer发布消息,两个AppConsumer监听接收到的消息分别为:

    消费者接收到消息:create message 0
    消费者接收到消息:create message 2
    消费者接收到消息:create message 4
    消费者接收到消息:create message 6
    消费者接收到消息:create message 8
    
    消费者接收到消息:create message 1
    消费者接收到消息:create message 3
    消费者接收到消息:create message 5
    消费者接收到消息:create message 7
    消费者接收到消息:create message 9
    
    主题模式的消息演示

    AppProducer.java

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class AppProducer {
        private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
        private static final String TOCPI_NAME = "topic-test";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
            Connection connection = factory.createConnection();
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建主题
            Destination destination = session.createTopic(TOCPI_NAME);
    
            MessageProducer producer = session.createProducer(destination);
            for(int i = 0; i < 10; i++){
                TextMessage message = session.createTextMessage("create message " + i);
                producer.send(message);
                System.out.println("消息已发送 :" + message.getText());
            }
    
            connection.close();
        }
    }
    

    AppConsumer.java

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class AppConsumer {
    
        private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
        private static final String TOCPI_NAME = "topic-test";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
            Connection connection = factory.createConnection();
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic(TOCPI_NAME);
    
            MessageConsumer consumer = session.createConsumer(destination);
    
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage message1 = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息:" +  message1.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
        }
    }
    

    运行AppProducer后,再运行AppConsumer,监听不到消息发布;两个AppConsumer监听,会全部接收到AppProducer发布的消息。

    使用Spring集成JMS连接ActiveMQ

    • ConnectionFactory:用于管理连接的连接工厂
      • 一个Spring为我们提供的连接池
      • JmsTemplate每次发消息都会重新创建连接,会话和productor
      • spring提供了SingleConnectionFactory和CachingConnectionFactory
    • JmsTemplate:用于发送和接收消息的模版类
      • 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms
      • JmsTemplate类是线程安全的,可以在整个应用范围使用
    • MessageListerner:消息监听器
      • 实现一个onMessage方法,该方法只接收一个Message参数

    pom.xml

    <properties>
        <spring-version>4.3.9.RELEASE</spring-version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring-version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
            <!-- 排除 ActiveMQ 自身依赖的 Spring -->
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    

    ProducerService.java

    public interface ProducerService {
        /**
         * 生产者发送消息
         * @param message
         */
        void sendMessage(String message);
    }
    

    common.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd 
           http://www.springframework.org/schema/context 
           http://www.springframework.org/schema/context/spring-context.xsd">
        
        <context:annotation-config/>
    
        <!-- ActiveMQ 提供的ConnectionFactory -->
        <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <!-- 配置 brokerURL,这里为你自己开启 ActiveMQ 服务的地址-->
            <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
        </bean>
    
        <!-- Spring jms为我们 提供的连接池 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
        </bean>
    
        <!-- 
            点对点或队列模型
            配置队列的目的地 
        -->
        <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg name="name" value="spring-jms-queue"/>
        </bean>
    
        <!-- 
            发布者/订阅者模型
            配置主题的目的地 
        -->
        <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="spring-jms-topic"/>
        </bean>
    </beans>
    

    producer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 导入公共配置 -->
        <import resource="common.xml"/>
        
        <!-- 配置 JmsTemplate -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        
        <!-- 把 ProducerServiceImpl 交给Spring IoC 容器管理-->
        <bean class="com.jas.jms.producer.ProducerServiceImpl"/>
    </beans>
    

    ProducerServiceImpl.java

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    
    public class ProducerServiceImpl implements ProducerService {
    
        @Autowired
        JmsTemplate jmsTemplate;
        /**
         * 这里以 @Resource 方式注入目的地对象
         * 如果是发布者/订阅者模式,只选要修改 name 中的值为“activeMQTopic”即可
         */
        @Resource(name = "activeMQQueue")
        Destination destination;
    
        @Override
        public void sendMessage(final String message) {
           jmsTemplate.send(destination, new MessageCreator() {
               @Override
               public Message createMessage(Session session) throws JMSException {
                   TextMessage textMessage = session.createTextMessage(message);
                   return textMessage;
               }
           });
            System.out.println("消息已发送:" + message);
        }
    }
    

    Producer.java

    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class Producer {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
            ProducerService producerService = context.getBean(ProducerService.class);
    
            for (int i = 0; i < 10; i++) {
                producerService.sendMessage("test message:" + i);
            }
            
            // 关闭 IoC 容器
            context.close();
        }
    }
    

    consumer.xml

    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 导入公共配置 -->
        <import resource="common.xml"/>
    
        <!-- 配置自定义消费者消息监听器 -->
        <bean id="consumerMessageListener" class="com.jas.jms.consumer.ConsumerMessageListener"/>
        
        <!-- 配置消息监听器的容器 -->
        <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="activeMQQueue"/>
            <!--
                配置发布者/订阅者模型的目的地
                <property name="destination" ref="activeMQTopic"/>
             -->
            <property name="messageListener" ref="consumerMessageListener"/>
        </bean>
    </beans>
    

    ConsumerMessageListener.java

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class ConsumerMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
    
            try {
                System.out.println("接收已接收:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    Consumer.java

    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class Consumer {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
        }
    }
    

    ActiveMQ集群配置

    • 为什么要对消息中间件集群?

      • 实现高可用,以排除单点故障引起的服务中断
      • 实现负载均衡,以提升效率为更多客户提供服务
    • 集群方式

      • 客户端集群:让多个消费者消费同一队列
      • Broker clusters:多个Broker之间同步消息
      • Master Slave:实现高可用
    • ActiveMQ失效转义(failover)

      • 允许当中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器。
      • 语法:failover:(uri,...,uriN)?transportOptions
    • transportPotions参数说明

      • randomize默认为true,表示在URI列表中选择URI连接时是否采用随机策略
      • initialReconnectDelay默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
      • maxReconnectDelay默认30000,单位毫秒,最长重连的时间间隔

    相关文章

      网友评论

          本文标题:Java消息中间件

          本文链接:https://www.haomeiwen.com/subject/udfilftx.html