美文网首页
消息中间件ActiveMQ

消息中间件ActiveMQ

作者: _52Hertz | 来源:发表于2019-07-05 17:15 被阅读0次

消息中间件概述

中间件介绍

什么是中间件?

非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给用户带来价值的软件统称中间件。

什么是消息中间件

关注于数据的发送与接收,利用高效可靠的异步消息传递机制集成分布式系统。

消息中间件图示


应用A通过应用程序接口向消息中间件发送消息,应用B通过应用程序接口向消息中间件接收消息。

什么是JMS

Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送/接收消息,进行异步通信。

什么是AMQ

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制。

JMS和AMQP对比

常见消息中间件对比

ActiveMQ

  • 多种语言和协议编写客户端,语言:Java、C语言、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP。
  • 完全支持JMS1.1和J2EE1.4规范(持久化、XA消息、事务)
  • 虚拟主题、组合目的、镜像队列

RabbitMQ

  • 支持多种客户端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
  • AMQP的完整实现
  • 事务支持/发布确认
  • 消息持久化

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了消息系统的功能。

特性
  • 消息的持久化
  • 高吞吐量
  • Partition、Consumer Group
综合对比

JMS规范

Java消息服务定义

Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送/接收消息,进行异步通信。

JMS概念

  • 提供者:实现JMS规范的消息中间件服务器
  • 客户端:发送或接收消息额应用程序
  • 生产者/发布者:创建并发送消息的客户端
  • 消费者/订阅者:接收并处理消息的客户端
  • 消息: 应用程序间传递的数据内容
  • 消息模式:在客户端间传递消息的方式,JMS中定义了主题和队列两种模式

消息模式

队列模型

  • 客户端包括生产者和消费者
  • 队列中的消息只能被一个消费者消费
  • 消费者可以随时消费队列中的消息
队列模型示意图

主题模型

  • 客户端包括生产者和消费者
  • 主题中的消息被所有订阅者消费
  • 消费者不能消费订阅之前就发送到主题中的消息
队列模型示意图

JMS编码规范

  • ConnectionFactory:用于创建连接到消息中间件的连接工厂
  • Connection:“链接”,代表了应用程序和消息服务器之间的通信链路
  • Destination:“目的地”,指消息发布和接收的地点,包括队列或主题
  • Session:“会话”,表示一个单线程的上下文,用于发送和接收消息
  • MessageConsumer:“消费者”,一种可以向JMS提供获取消息的客户端类型
  • MessageProducer:“生产者”,消费者和生产者间传送的对象,消息头,一组消息属性,一个消息体

JMS编码接口之间的关系

windows下安装ActiveMQ

Linux下安装ActiveMQ

队列模式的消息演示

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.0</version>
        </dependency>
    </dependencies>

ActiveMqProducer.java

package com.cxy.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 生产者
 */
public class ActiveMqProducer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createQueue(queueName);

        //6.创建一个生产者
        MessageProducer producer = session.createProducer(destination);

        for(int i=0;i<100;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            //8.发布消息
            producer.send(textMessage);

            System.out.println("发送消息:"+textMessage.getText());
        }

        //9.关闭连接
        connection.close();
    }
}

ActiveMqConsumer.java

package com.cxy.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 消费者
 */
public class ActiveMqConsumer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createQueue(queueName);

        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });

    }
}

主题模式的消息演示

ActiveMqProducer.java

package com.cxy.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 生产者
 */
public class ActiveMqProducer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createTopic(topicName);

        //6.创建一个生产者
        MessageProducer producer = session.createProducer(destination);

        for(int i=0;i<100;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            //8.发布消息
            producer.send(textMessage);

            System.out.println("发送消息:"+textMessage.getText());
        }

        //9.关闭连接
        connection.close();
    }
}

ActiveMqConsumer.java

package com.cxy.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther: cxy
 * @Date: 2019/7/4
 * @Description: 消费者
 */
public class ActiveMqConsumer {
    private static final String url = "tcp://192:168.31.10:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        //2.创建Connection
        Connection connection = connectionFactory.createConnection();

        //3.启动连接
        connection.start();

        //4.创建会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //5.创建一个目标
        Destination destination = session.createTopic(topicName);

        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);

        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });

    }
}

activemq模式区分

队列模式:生产者发送消息,所有消费者对消息进行平分,已消费的消息不能重新消费
主题模式:生产者发送消息,所有已订阅主题的消费者都能收到消息。

相关文章

网友评论

      本文标题:消息中间件ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/dpkoqctx.html