美文网首页
消息队列之ActiveMQ

消息队列之ActiveMQ

作者: Demon先生 | 来源:发表于2020-05-08 15:27 被阅读0次

    1. 什么是ActiveMQ

    1.1. 介绍

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    1.2. ActiveMQ的消息形式

    ActiveMQ对于消息的传递有两种类型:

    第一种:点对点的,即一个生产者和一个消费者一一对应。


    image.png

    第二种:发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

    image.png

    对于消息正文格式,以及调用的消息类型。JMS定义了五种,允许发送并接收一些不同形式的数据。

    · StreamMessage -- Java原始值的数据流

    · MapMessage--一套名称-值对

    · TextMessage--一个字符串对象

    · ObjectMessage--一个序列化的 Java对象

    · BytesMessage--一个字节的数据流

    2. ActiveMQ服务器的安装

    ActiveMQ是java语言开发的,需要使用到jdk,这里介绍在Linux系统种进行安装。具体步骤如下:

    第一步:下载ActiveMQ压缩包,上传并解压到Linux系统。

    第二步:进入解压包路径/bin目录下,启动ActivceMQ,相关操作命令如下:

    启动:
    [root@localhost bin]# ./activemq start
    关闭:
    [root@localhost bin]# ./activemq stop
    查看状态:
    [root@localhost bin]# ./activemq status
    
    2222.png

    第三步:检测是否启动成功,在浏览器中访问网址:ip:8161/admin,在不修改配置文件的前提下,8161端口为默认端口,用户名:admin,密码:admin

    image.png

    3. ActiveMQ的三种使用模式

    在进行ActiveMQ操作之前,需要在Maven工程pom.xml文件中引入ActiveMQ的jar包,引用如下:

       <!-- activemq 消息队列配置-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.13.0</version>
            </dependency>
    

    3.1. Queue点对点模式

    Queue 是点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。

    3.1.1 构建Producer生产者

    Producer为生产者,生产并发送消息。

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

    第二步:使用ConnectionFactory对象创建一个Connection对象。

    第三步:开启连接,调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

    第六步:使用Session对象创建一个Producer对象。

    第七步:创建一个Message对象,创建一个TextMessage对象。

    第八步:使用Producer对象发送消息。

    第九步:关闭资源。

    //生产者发送消息
        @Test
        public void send() throws Exception{
            //1.创建一个连接工厂 connectionfactory
            //参数:就是要连接的服务器的地址
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.通过工厂获取连接对象 创建连接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建一个session对象  提供发送消息等方法
            //第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。
            //第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//
            //5.创建目的地 (destination)  queue  
            //参数:目的地的名称
            Queue queue = session.createQueue("queue-test");
            //6.创建个生产者
            MessageProducer producer = session.createProducer(queue);
            //7.构建消息的内容  
            TextMessage textMessage = session.createTextMessage("queue测试发送的消息");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    

    3.1.2 构建Consumer消费者

    Consumer为消费者,负责接收和处理消息。

    第一步:创建一个ConnectionFactory对象。

    第二步:从ConnectionFactory对象中获得一个Connection对象。

    第三步:开启连接。调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

    第六步:使用Session对象创建一个Consumer对象。

    第七步:接收消息。

    第八步:打印消息。

    第九步:关闭资源

    @Test
        public void recieve() throws Exception {
            //1.创建连接的工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建session
            //第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。
            //第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建接收消息的一个目的地
            Queue queue = session.createQueue("queue-test");
            //6.创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //7.接收消息 打印
            //设置一个监听器
            //System.out.println("start");
            //这里其实开辟了一个新的线程
            consumer.setMessageListener(new MessageListener() {
                
                //当有消息的时候会执行以下的逻辑
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        TextMessage message2 = (TextMessage) message;
                        try {
                            System.out.println("接收的消息为"+message2.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //System.out.println("end");
            Thread.sleep(199999);
            //8.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    

    3.2. Topic发布订阅模式

    Topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。

    3.2.1 构建Producer生产者

    Producer为生产者,生产并发送消息。

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

    第二步:使用ConnectionFactory对象创建一个Connection对象。

    第三步:开启连接,调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。

    第六步:使用Session对象创建一个Producer对象。

    第七步:创建一个Message对象,创建一个TextMessage对象。

    第八步:使用Producer对象发送消息。

    第九步:关闭资源。

    // 发送topic
        @Test
        public void send() throws Exception {
            //1.创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建目的地    topic
            Topic createTopic = session.createTopic("topic-test");
            //6.创建生成者
            MessageProducer producer = session.createProducer(createTopic);
            //7.构建消息对象
            TextMessage createTextMessage = session.createTextMessage("topic发送的消息123");
            //8.发送消息
            producer.send(createTextMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    

    3.2.2 构建Consumer消费者

    Consumer为消费者,负责接收和处理消息。

    第一步:创建一个ConnectionFactory对象。

    第二步:从ConnectionFactory对象中获得一个Connection对象。

    第三步:开启连接。调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。

    第六步:使用Session对象创建一个Consumer对象。

    第七步:接收消息。

    第八步:打印消息。

    第九步:关闭资源

    @Test
        public void reieve() throws Exception {
            // 1.创建连接的工厂 指定MQ服务器的地址
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
            // 2.获取连接
            Connection connection = connectionFactory.createConnection();
            // 3.开启连接
            connection.start();
            // 4.根据连接对象创建session (提供了操作activmq的方法)
            // 第一个参数:表示是否开启分布式事务(JTA) 一般就是false :表示不开启。 只有设置了false ,第二个参数才有意义。
            // 第二个参数:表示设置应答模式 自动应答和手动应答 。使用的是自动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.根据session创建目的地(destination)
            Topic topic = session.createTopic("topic-test");
            // 6.创建消费者;
            MessageConsumer consumer = session.createConsumer(topic);
            // 7.接收消息
            // 设置一个监听器 就是开启了一个新的线程
            System.out.println("start");
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        TextMessage message2 = (TextMessage) message;
                        String text = "";
                        try {
                            text = message2.getText();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        } // 获取消息的内容
                        System.out.println(text);
                    }
                    System.out.println();
                }
            });
            System.out.println("end");
            // 睡眠
            Thread.sleep(10000000);
    
            // 9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    

    3.3. Spring整合模式

    在Maven工程pom.xml文件中添加Spring相关的jar包,引用如下:

            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version> 4.2.4.RELEASE</version>
            </dependency>
    

    3.3.1 配置Spring和ActiveMQ配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
        
        <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.129:61616"/>
        </bean>
        <!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnection"/>
        </bean>
        <!-- 接收和发送消息时使用的类 模板对象-->
        <bean class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!-- <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg name="name" value="item-change-queue"></constructor-arg>
        </bean> -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-change-topic"/>
        </bean>
        
        <!-- 监听器 -->
        <bean id="myMessageListener" class="com.demon.activemq.spring.MyMessageListener"/>
        <!-- 监听容器,作用:启动线程做监听 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="topicDestination"/>
            <property name="messageListener" ref="myMessageListener"/>
        </bean>
    
        <bean id="myMessageListener2" class="com.demon.activemq.spring.MyMessageListener"/>
        <!-- 监听容器,作用:启动线程做监听 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="topicDestination"/>
            <property name="messageListener" ref="myMessageListener2"/>
        </bean>
    </beans>
    

    3.3.2 生产者发送消息

    第一步:初始化一个spring容器

    第二步:从容器中获得JMSTemplate对象。

    第三步:从容器中获得一个Destination对象

    第四步:使用JMSTemplate对象发送消息,需要知道Destination

    @Test
        public void send() throws Exception{
            //1.初始化spring容器
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/resource/applicationContext-activemq.xml");
            //2.获取到jmstemplate的对象
            JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
            //3.获取destination
            Destination destination = (Destination) context.getBean(Destination.class);
            //4.发送消息
            jmsTemplate.send(destination, new MessageCreator() {
                
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("通过spring发送的消息123");
                }
            });
            Thread.sleep(100000);
        }
    

    3.3.3 消费者接收消息

    创建一个MessageListener的实现类。

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            //获取消息
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    文档下载地址:

    https://wenku.baidu.com/view/94f62ed6876fb84ae45c3b3567ec102de3bddf14

    相关文章

      网友评论

          本文标题:消息队列之ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/oumfnhtx.html