美文网首页
初识activeMQ

初识activeMQ

作者: 自信的末日菇 | 来源:发表于2018-06-22 14:44 被阅读0次

    项目需要用到消息队列,就想先了解一下activeMQ这个东西

    关于activeMQ

    ActiveMQ是Apache出品的,非常流行的消息中间件。

    安装

    上官网下载对应系统的版本。http://activemq.apache.org/.

    启动

    windows 直接运行bin目录下activemq.bat
    linux 在bin目录下执行./activemq start

    目录介绍

    • conf里面是配置文件,重点关注的是activemq.xml(链接端口灯)、jetty.xml(登录地址,端口信息)、jetty-realm.properties(Web控制台需要用户名、密码信息)。。

    • data目录下是ActiveMQ进行消息持久化存放的地方,默认采用的是kahadb,当然我们可以采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。

    • webapps,注意ActiveMQ自带Jetty提供Web管控台

    使用activeMQ

    配置maven

    根据activeMQ版本增加对应依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.0</version>
    </dependency>
    

    消息队列P2P模式

    创建生产者队列并发送消息
    package active_mq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TestActiveMQ {
    
        ConnectionFactory getConnectionFactory(){
            return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
        }
    
        void testMQ() throws JMSException {
            //创建Connection
            Connection conn = getConnectionFactory().createConnection();
            conn.start();
            //创建Session
            Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //创建Destination 目标队列
            Destination destination = session.createQueue("firstQueue");
            //创建MessageProducer 生产者
            MessageProducer messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //定义消息对象
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("Hello,ActiveMQ"+System.currentTimeMillis());
            //发送
            messageProducer.send(textMessage);
            //关闭链接
            if (conn != null) {
                conn.close();
            }
        }
    
        public static void main(String[] args) {
            TestActiveMQ activeMQ = new TestActiveMQ();
            try {
                activeMQ.testMQ();
                activeMQ.testMQ();
                activeMQ.testMQ();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    发送了三条消息

    创建消费者并接收消息
    package active_mq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TestActiveMQConsumer {
    
        ConnectionFactory getConnectionFactory(){
            return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
        }
    
        void testMQ(String name) throws JMSException {
            Connection conn = getConnectionFactory().createConnection();
            conn.start();
            Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("firstQueue");
            MessageConsumer messageConsumer = session.createConsumer(destination);
            //设置listener,封装好了消息轮询
            messageConsumer.setMessageListener(message -> {
                try {
                    System.out.println("消息来啦:"+name);
                    System.out.println(((TextMessage)message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    
        public static void main(String[] args) {
            TestActiveMQConsumer activeMQ = new TestActiveMQConsumer();
            try {
                activeMQ.testMQ("xiaohong");
                activeMQ.testMQ("xiaolv");
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
      
        
    
    

    运行结果:结果每个人可能会不一样。

    消息来啦:xiaohong
    Hello,ActiveMQ1529400563732
    消息来啦:xiaolv
    Hello,ActiveMQ1529400563763
    消息来啦:xiaohong
    Hello,ActiveMQ1529400563788
    

    订阅模式Pub/Sub

    一对多通信,发送一条消息,所有订阅了该目标的消费者都会收到消息。
    P2P、Pub/Sub在代码上的区别点仅仅在于,目标类型的创建是createQueue or createTopic,其他一切照旧

    创建生产者队列并发送消息

    代码跟队列基本没差别。只是创建的消息对象时topic

    package active_mq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TestActiveMQ {
    
        ConnectionFactory getConnectionFactory(){
            return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
        }
    
        void testMQ() throws JMSException {
            Connection conn = getConnectionFactory().createConnection();
            conn.setClientID("clientFirst");
            conn.start();
            Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //创建消息订阅对象
            Destination destination = session.createTopic("topic");
            MessageProducer messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("Hello,ActiveMQ"+System.currentTimeMillis());
            messageProducer.send(textMessage);
            if (conn != null) {
                conn.close();
            }
        }
    
        public static void main(String[] args) {
            TestActiveMQ activeMQ = new TestActiveMQ();
            try {
                activeMQ.testMQ();
                activeMQ.testMQ();
                activeMQ.testMQ();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    订阅消息
    package active_mq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TestActiveMQConsumer {
    
        ConnectionFactory getConnectionFactory(){
            return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
        }
    
        void testMQ(String name) throws JMSException {
            Connection conn = getConnectionFactory().createConnection();
            //设置id后,不在线也不会丢失订阅消息,下次上线的时候就可以获取到消息
            conn.setClientID(name);
            conn.start();
            Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("topic");
            //订阅方式的差别在这里
            MessageConsumer messageConsumer = session.createDurableSubscriber((Topic)destination,name);
            messageConsumer.setMessageListener(message -> {
                try {
                    System.out.println("消息来啦:"+name);
                    System.out.println(((TextMessage)message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    
        public static void main(String[] args) {
            TestActiveMQConsumer activeMQ = new TestActiveMQConsumer();
            try {
                activeMQ.testMQ("xiaohong");
                activeMQ.testMQ("xiaolv");
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    执行结果

    消息来啦:xiaolv
    Hello,ActiveMQ1529403218938
    消息来啦:xiaohong
    Hello,ActiveMQ1529403218938
    消息来啦:xiaohong
    Hello,ActiveMQ1529403218975
    消息来啦:xiaolv
    Hello,ActiveMQ1529403218975
    消息来啦:xiaolv
    Hello,ActiveMQ1529403219001
    消息来啦:xiaohong
    Hello,ActiveMQ1529403219001
    

    相关文章

      网友评论

          本文标题:初识activeMQ

          本文链接:https://www.haomeiwen.com/subject/qcpkyftx.html