ActiveMQ

作者: 拼搏男孩 | 来源:发表于2020-05-08 20:37 被阅读0次

    一、概述

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削峰等问题。

    实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

    目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

    二、消息队列的应用场景

    1、异步处理:用户注册后,需要发送注册邮件和注册短信,传统的做法有两种:串行和并行,引入消息队列后,异步处理,直接返回,提高效率

    ​ 2、应用解耦:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口,引入消息队列后订单系统将消息写入消息队列后直接返回,库存系统从消息队列中获取订单信息,这样就实现了订单系统与库存系统的解耦。

    ​ 3、流量削峰:秒杀与团抢活动中,可以在应用前端加入消息队列,可以控制活动的人数,缓解短时间内高流量压垮应用。用户的请求,服务器接收后,首先写入消息队列中。加入消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。

    ​ 4、日志处理:日志采集客户端,负责日志数据采集,定时写入Kafka队列;Kafka消息队列,负责日志数据的接收,存储和转发;日志处理应用:订阅并消费kafka队列中的日志数据。

    ​ 5、消息通讯:消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

    三、ActiveMQ

    1、简介

    ActiveMQ出身名门,是Apache门下的最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE1.4规范的JMS Provide实现。它从设计上保证了高性能的集群,当然实现了JMS的P2P与PubSub两种开发模式。

    2、安装并运行

    去官网http://activemq.apache.org/components/classic/download/下载ActiveMQ的最新版本。解压,进入到bin/win64文件夹下,并启动activemq.bat,在浏览器输入:http://localhost:8161/即可看到ActiveMQ的管理界面。点击:Manage ActiveMQ broker登录管理后台:(用户名、密码默认都为admin)。登录进去以后我们可以点击Queues看到消息队列以及消费者情况等。

    4、代码示例

    4.1 添加依赖

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.qianfeng</groupId>
        <artifactId>507activemq</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.12</version>
            </dependency>
        </dependencies>
    
    </project>
    

    由于不是spring项目,只是一个简单的spring项目,所以只添加了activemq-all这个依赖

    4.2 P2P模式

    消息队列有两种模式,一种称为p2p,点对点的模式,另一种称为pubsub(publish、subscribe)模式,一对多的模式,这两种模式各有特点。

    生产者:

    TestProvider.java

    package com.qianfeng;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestProvider {
        @Test
        public void testProvider(){
            //使用缺省的用户名、密码和url创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    ActiveMQConnection.DEFAULT_BROKER_URL
            );
            try {
                //通过连接工厂创建连接对象
                Connection connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //通过连接对象创建会话对象,第一个参数位是否开启事务
                Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                //P2P模式使用的是Queue
                Queue queue = session.createQueue("names");
                MessageProducer producer = session.createProducer(queue);
                sendMessage(session,producer);
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        private void sendMessage(Session session, MessageProducer producer) {
            try {
                for (int i = 0; i < 10; i++) {
                    TextMessage message = session.createTextMessage("zhangsan"+i);
                    producer.send(message);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    }
    

    TestConsumer.java

    package com.qianfeng;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestConsumer {
        @Test
        public void testConsumer(){
            //使用缺省的用户名、密码和url创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    ActiveMQConnection.DEFAULT_BROKER_URL
            );
            try {
                //通过连接工厂创建连接对象
                Connection connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //通过连接对象创建会话对象,第一个参数为是否开启事务
                Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
                Queue queue = session.createQueue("names");
                //获取consumer
                MessageConsumer consumer = session.createConsumer(queue);
                while (true){
                    TextMessage message = (TextMessage) consumer.receive(10000);
                    if(message != null){
                        System.out.println(message.getText());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    P2P模式的特点是消费者一定会收到,类似现实中的短信。

    4.3 PubSub模式

    PubSub模式类似现实中的广播,需要先启动消费者,再启动生产者,消费者才能收到消息。

    TestTopicProvider.java

    package com.qianfeng;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestTopicProvider {
        @Test
        public void testTopicProducer(){
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            try {
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                Topic topic = session.createTopic("test-topic");
                MessageProducer producer = session.createProducer(topic);
                sendMessage(session,producer);
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        private void sendMessage(Session session, MessageProducer producer) {
            try {
                for (int i = 0; i < 10; i++) {
                    TextMessage message = session.createTextMessage("zhangsan"+i);
                    System.out.println("activeMQ发送消息"+i);
                    Thread.sleep(5000);
                    producer.send(message);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    TestTopicConsumer.java

    package com.qianfeng;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestTopicConsumer {
        @Test
        public void testTopicConsumer(){
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            try {
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                Topic topic = session.createTopic("test-topic");
                MessageConsumer consumer = session.createConsumer(topic);
                while (true){
                    TextMessage message = (TextMessage) consumer.receive(10000);
                    if(message!=null){
                        System.out.println("接收到消息:"+message.getText());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/dmbqnhtx.html