消息队列之ActiveMQ(一)

作者: 吴世浩 | 来源:发表于2017-02-11 14:19 被阅读104次

    一、浩言

    我们要变的变态,才能吸引更多的变态。

    二、背景

    最近在看消息队列的东西,准备引入到项目中使用,一直说要自己先学会使用,在使用中学习。所以这算是新年的第一个周末就要加班,就在网上找资料自己尝试配置下。顺便纪录下经历了,都是有坑要去踩的。

    三、ActiveMQ 在Windows下的使用

    我修改了配置文件activemq.xml,将0.0.0.0改成了127.0.0.1

         <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://127.0.0.1:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
                <transportConnector name="amqp" uri="amqp://127.0.0.1:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
                <transportConnector name="stomp" uri="stomp://127.0.0.1:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
                <transportConnector name="mqtt" uri="mqtt://127.0.0.1:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
                <transportConnector name="ws" uri="ws://127.0.0.1:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
            </transportConnectors>
    

    注意点:我在测试的时候一直报1G要大于maxFrameSize的100M,所以我在这个后面加了三个0

    然后启动启动active


    Paste_Image.png Paste_Image.png

    我们在windows下使用这个命令查看端口netstat -ano linux下可以使用netstat -lntp查看


    Paste_Image.png

    在浏览器中输入:http://127.0.0.1:8161/admin/index.jsp会有如下显示

    active-mq.png
    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> 
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://127.0.0.1:61616"></property>
            <!-- <property name="brokerURL" value="http://127.0.0.1:61616"></property> -->
            <property name="useAsyncSend" value="true"></property>
            <property name="alwaysSessionAsync" value="true"></property>
            <property name="useDedicatedTaskRunner" value="true"></property>
        </bean>
    
        <!-- 发送消息的目的地 -->
        <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
                  <!-- 消息队列的名字 -->
            <constructor-arg value="mahone.queue"/>
        </bean>
    </beans> 
    

    注意点:我在这里面看到的我有个注释的配置brokerURL,我刚刚开始的时候一直配置成http...,然后怎么测试都不通,最后才发现是自己这里配置错了。

    Paste_Image.png

    下面附上测试代码

    package com.mouse.moon.test.mq;
    
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class TestMq {
        public static void sendWithAuto(ApplicationContext context) {  
            ActiveMQConnectionFactory factory = null;  
            Connection conn = null;  
            Destination destination = null;  
            Session session = null;  
            MessageProducer producer = null;  
            try {  
                destination = (Destination) context.getBean("destination");  
                factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");  
                conn = factory.createConnection();  
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                producer = session.createProducer(destination);  
                TextMessage message = session.createTextMessage("...Hello JMS-4!");  
                
                
                producer.send(message);  
            } catch (Exception e) {  
                e.printStackTrace();  
            } finally {  
                try {//关闭生产者  
                    producer.close();  
                    producer = null;  
                } catch (Exception e) {
                    
                }  
                    try {//关闭session  
                        session.close();  
                        session = null;  
                    }  
                    catch (Exception e) {  
                        
                    }  
                    
                    try {  //停止连接
                        conn.stop();  
                    } catch (Exception e) {  
                        
                    }  
                try {//关闭连接  
                    conn.close();  
                } catch (Exception e) {  
                }  
            }  
        }  
        public static void main(String[] args) {  
            System.out.println("-------------start--------------");
            final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/xml/activemq.xml");  
            sendWithAuto(context);  
            System.out.println("-------------end---------------");
        }  
    }
    
    Paste_Image.png Paste_Image.png

    在做测试的时候还遇到如下问题

    org.apache.activemq.ConfigurationException: There are 1 consumer options that couldn't be set on the consumer. Check the options are spelled correctly. Unknown parameters=[{perfetchSize=100}]. This consumer cannot be started.
    

    这是在设置名字的时候附带的参数,我把这个去掉了就只能看到生产者的名字是"mahone.queue""
    消费端代码:

    package com.mouse.moon.test.mq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    /**
     * 
     * @author Mahone
     * @description:消费端
     *
     */
    public class TestConsumer  extends Thread implements MessageListener{
        
        private Connection conn = null;
        
        private Destination destination = null ;
        
        private Session session = null;
    
        @Override
        public void onMessage(Message message) {
            try{
                TextMessage tm = (TextMessage)message;
                System.out.println("receive message:"+tm.getText());
            }catch(Exception e){
                e.printStackTrace();
            }
            
        }
        
        @Override
        public void run(){
            receive();
        }
        
        public void receive(){
            ConnectionFactory factory = null;
            Connection conn = null ;
            
            try{
                final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/xml/activemq.xml");
                factory = (ActiveMQConnectionFactory)context.getBean("targetConnectionFactory");
                conn = factory.createConnection();
                conn.start();
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = (Destination)context.getBean("destination");
                MessageConsumer consumer = session.createConsumer(destination);
                consumer.setMessageListener(this);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        public static void main(String args[]){
            TestConsumer mythread = new TestConsumer();
            mythread.start();
        }
    }
    
    
    Paste_Image.png

    上述代码有参考该<a href='http://blog.csdn.net/lifetragedy/article/details/51836557'>博客</a>

    四、消息队列的作用

    1:解耦操作
    例如说其中A更新好了之后,其他地方(B,C,D)也需要更新,那么使用消息队列之后,A更新后,将更新通知放入消息队列中,B,C,D中需要更新只需要订阅这个操作就好,实现解耦。

    2、实现异步操作
    例如上面说的,A更新后,其他的更新都是异步进行处理,并不需要同步操作。

    3、对流量进行控制
    有时候访问多了,流量就大了,忽高忽低的,如果放入队列中,按照顺序进行操作,即是先存起来,然后在按照顺序进行发送。

    五:浩语

                                     __                                                     
                      __  _  ____ __|  |__ _____    ___
                      \ \/ \/ /  |  \  |  \\__  \  /  _ \   
                       \     /|  |  /   Y  \/ __ \(  <_> )
                        \/\_/ |____/|___|  (____  /\____/ 
                                              \/     \/          
                           任何事情都是要靠努力和用心。                                                   
    

    相关文章

      网友评论

        本文标题:消息队列之ActiveMQ(一)

        本文链接:https://www.haomeiwen.com/subject/akigittx.html