activeMQ

作者: Mtllll | 来源:发表于2019-03-10 13:53 被阅读0次

    1. JMS和AMQP对比

    2. 一些消息组件的对比

    3. JMS规范

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    JMS是一种与厂商无关的 API,用来访问收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

    4.JMS的相关概念

    • 提供者:实现jms规范的消息中间件的服务器
    • 客户端:发送或接受消息的应用程序
    • 生产者/发布者:创建并发送消息的客户端
    • 消费者/订阅者:接受并处理消息的客户端
    • 消息:应用程序之间传递的数据内容
    • 消息模式:在客户端之间传递消息的方式,jms中定义了主题和队列两种模式
    4.1.jms的消息模式
    1. 队列模型
    • 客户端包括生产者和消费者
    • 队列中的消息只能被一个消费者消费
    • 消费者可以随时消费队列中的消息


    1. 主题模型
    • 客户端包括发布者和订阅者
    • 主题中的消息被所有订阅者消费
    • 消费者不能消费订阅之前就发送到主题中的消息


    5.jms编码接口

    • ConnectionFactory用于创建连接到消息中间件的连接工厂
    • Connection代表了应用程序和消息服务器之间的通讯链路
    • Destination指的是消息发布和接收的地点,包括队列和主题
    • Session标识一个单线程的上下文,用于发布和接收消息
    • MessageConsumer由会话创建,用于接收发送到目标的消息
    • MessageProducer由会话创建,用于发送消息到目标
    • Message 消息体,是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体


    6.主题模式和队列模式代码演示

    1. 新建maven工程添加依赖



      2.创建生产者

    package com.mtl.jms.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by Mtl on 2019/3/4.
     */
    public class appProducer {
        private static final String url = "tcp://http://127.0.0.1:8161";
        private static final String queueName = "queue_test";
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            //2.创建connection
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标(队列模式)
            Destination destination = session.createQueue(queueName);
            //(主题模式)
            Destination destination = session.createTopic(topicName);
            //6.创建一个生产者
            MessageProducer producer = session.createProducer(destination);
            for(int i = 0;i<100;i++){
                //7.创建消息
                TextMessage textMessage = session.createTextMessage("test"+i);
                //8.发布消息
                producer.send(textMessage);
                System.out.println("发送消息"+textMessage.getText());
            }
            //9.关闭连接
            connection.close();
        }
    }
    
    1. 创建消费者
    package com.mtl.jms.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by Mtl on 2019/3/4.
     */
    public class appConsumer {
        private static final String url = "tcp://192.168.31.10:61616";
    
        private static final String queueName = "queue_test";
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            //2.创建connection
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标(队列模式)
            Destination destination = session.createQueue(queueName);
            //(主题模式)
            Destination destination = session.createTopic(topicName);
            //6.创建消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //7创建一个监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.关闭连接,注释掉,消息接受的过程是异步的,不注释掉可能导致没接到消息就关闭了连接
           //connection.close();
        }
    }
    
    1. 总结
      根据结果我们可以看出,队列模型中消费者会平均消费消息。
      主题模式消费者需要先启动监听,生产者后启动发送消息,消费者才能收到信息,并且会接收到全部的消息,而不是平均接收。

    使用spring集成jms连接activemq

    1. connectionfactory
    • 一个spring提供的连接池
    • jmstemplate每次发消息都会重新穿件链接,会话和producer,非常耗性能
    • spring中提供了singleconnectionfactory(整个应用都会在一个链接中)和cachingconnectionfactory(继承singleconnectionfactory,并增加缓存功能)
    1. jmsTemplate
    • spring提供的,只要向spring容器中注册这个类,就可以使用它方便的操作jms
    • 线程安全,可以在整个应用使用,可以创建多个
    1. messageListener
    • 实现一个onmessage方法,该方法接收一个message参数

    相关文章

      网友评论

          本文标题:activeMQ

          本文链接:https://www.haomeiwen.com/subject/kzfnyqtx.html