ActiveMQ

作者: lhsjohn | 来源:发表于2019-03-03 23:08 被阅读0次

    What is ActiveMQ?

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
    主要特点:

    1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    6. 支持通过JDBC和journal提供高速的消息持久化
    7. 从设计上保证了高性能的集群,客户端-服务器,点对点
    8. 支持Ajax
    9. 支持与Axis的整合
    10. 可以很容易得调用内嵌JMS provider,进行测试

    ActiveMQ的消息形式

    对于消息的传递有两种类型:

    一种是点对点的,即一个生产者和一个消费者一一对应;

    另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    1. · StreamMessage -- Java原始值的数据流
    2. · MapMessage--一套名称-值对
    3. · TextMessage--一个字符串对象
    4. · ObjectMessage--一个序列化的 Java对象
    5. · BytesMessage--一个字节的数据流
    a.png

    使用测试(未整合Spring)

    
    public void testQueueProducer() throws Exception {
            // 1.创建一个连接工厂对象,需要指定服务的ip以及端口
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
            // 2.使用工厂对象来创建一个Connection对象
            Connection connection = connectionFactory.createConnection();
            // 3.开启连接,调用Connection对象的start方法
            connection.start();
            // 4.创建一个Session对象
            // 第一个参数:是否开启事务。如果true开启事务,第二个无意义。一般不开启false
            // 第二个参数:如果不开启事务。应答模式,一般是自动应答或者手动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.使用Session对象创建一个Destination对象。两种形式queue、topic
            Queue queue = session.createQueue("spring-queue");
            // 6.使用Session对象创建一个Producer对象
            MessageProducer producer = session.createProducer(queue);
            // 7.创建一个Message对象,可以使用TextMessage。
            /*
             * TextMessage textMessage=new ActiveMQTextMessage();
             * textMessage.setText("hello Activemq");
             */
            TextMessage textMessage = session.createTextMessage("hello Activemq");
            // 8.发送消息
            producer.send(textMessage);
            // 9.关闭资源
            producer.close();
            session.close();
            connection.close();
    
        }
    
    
        public void testQueueConsumer() throws Exception {
            // 创建一个ConnectionFactory对象连接MQ服务器
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
            // 创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            // 开启连接
            connection.start();
            // 使用Connection对象创建一个Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建一个Destinatiion对象,queue对象
            Queue queue = session.createQueue("spring-queue");
            // 使用Session对象创建一个消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            // 接收消息
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    // 打印结果
                    TextMessage textMessage = (TextMessage) message;
                    String text;
    
                    try {
                        text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
    
                        e.printStackTrace();
                    }
                }
            });
    
            // 等待接收消息
            System.in.read();
            // 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
        
        public void testTopicProducer() throws Exception {
            // 1.创建一个连接工厂对象,需要指定服务的ip以及端口
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
            // 2.使用工厂对象来创建一个Connection对象
            Connection connection = connectionFactory.createConnection();
            // 3.开启连接,调用Connection对象的start方法
            connection.start();
            // 4.创建一个Session对象
            // 第一个参数:是否开启事务。如果true开启事务,第二个无意义。一般不开启false
            // 第二个参数:如果不开启事务。应答模式,一般是自动应答或者手动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.使用Session对象创建一个Destination对象。两种形式queue、topic
            Topic topic = session.createTopic("test-topic");
            // 6.使用Session对象创建一个Producer对象
            MessageProducer producer = session.createProducer(topic);
            // 7.创建一个Message对象,可以使用TextMessage。
            /*
             * TextMessage textMessage=new ActiveMQTextMessage();
             * textMessage.setText("hello Activemq");
             */
            TextMessage textMessage = session.createTextMessage("topicMessage");
            // 8.发送消息
            producer.send(textMessage);
            // 9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    
    
        public void testTopicConsumer() throws Exception {
            // 创建一个ConnectionFactory对象连接MQ服务器
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
            // 创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            // 开启连接
            connection.start();
            // 使用Connection对象创建一个Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建一个Destinatiion对象,queue对象
            Topic topic = session.createTopic("test-topic");
            // 使用Session对象创建一个消费者对象
            MessageConsumer consumer = session.createConsumer(topic);
            // 接收消息
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    // 打印结果
                    TextMessage textMessage = (TextMessage) message;
                    String text;
    
                    try {
                        text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
    
                        e.printStackTrace();
                    }
                }
            });
          
            System.out.println("topic 消费者3已经启动");
            // 等待接收消息
            System.in.read();
            // 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    
    
    

    作者:lhsjohn

    相关文章

      网友评论

          本文标题:ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/cssvuqtx.html