ActiveMQ

作者: 一个喜欢烧砖的人 | 来源:发表于2018-09-14 21:30 被阅读4次

    什么是ActiveMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。

    • 多语言支持的客户端和协议(Java, C, C++, C#, Ruby, Perl, Python, PHP)
    • 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    • 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    • in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    • 支持通过JDBC和journal提供高速的消息持久化
    • 从设计上保证了高性能的集群,客户端-服务器,点对点
    • 支持Ajax
    • 支持与Axis的整合
    • 可以很容易得调用内嵌JMS provider,进行测试

    ActiveMQ的消息类型

    • 点对点的(queue)
      一个生产者和一个消费者一一对应(如果同时有多个消费者,只会有一个消费者受到消息)
    • 发布/订阅(topic)
      一个生产者发送消息后,可以由多个接受者进行接收

    jms 定义了5种消息正文格式

    • StreamMessage -- Java原始值的数据流
    • MapMessage--一套名称-值对
    • TextMessage--一个字符串对象
    • ObjectMessage--一个序列化的 Java对象
    • BytesMessage--一个字节的数据流

    ActiveMQ安装

    • 去官网进行下载
    • 安装以及开启
      第一步: 把ActiveMQ 的压缩包上传到Linux系统。
      第二步:解压缩。
      第三步:启动。
      使用bin目录下的activemq命令启动:
      [root@localhost bin]# ./activemq start
      关闭:
      [root@localhost bin]# ./activemq stop
      查看状态:
      [root@localhost bin]# ./activemq status
    maven 环境所需依赖
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.6</version>
            </dependency>
    

    示例代码

    • producer
    public class MyActiveMqProducer {
        public static void main(String[] arg) throws Exception {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://pangw:61617");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            Topic queue = session.createTopic("test-topic");
            MessageProducer producer = session.createProducer(queue);
            TextMessage textMessage = session.createTextMessage("21212312312321312321321312");
            producer.send(textMessage);
            producer.close();
            session.close();
            connection.close();
        }
    }
    
    
    • cusumer
    public class MyActiveMqConsumer {
        public static void main(String[] arg) throws Exception {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://pangw:61617");
            Connection connection = connectionFactory.createConnection();
            connection.start();
    
            //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
            //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic queue = session.createTopic("test-topic");
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        String s = textMessage.getText();
                        System.out.println("consumer:" + s);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    
    

    相关文章

      网友评论

          本文标题:ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/wawogftx.html