美文网首页
入门及ActiveMQ

入门及ActiveMQ

作者: 庙人 | 来源:发表于2020-07-28 14:37 被阅读0次
消息中间件的定义:

没有标准定义,一般认为,采用消息传送机制/****消息队列** **的中间件技术,进行数据交流,用在分布式系统的集成

为什么要用消息中间件?

解决分布式系统之间消息的传递。
电商场景:用户下单减库存,调用物流系统,系统扩充后服务化和业务拆分。系统交互,y一般用RPC(远程过程调用)。如果系统扩充到有几十个接口,消息中间件来解决问题。

和RPC有何区别?

使用区分标准:1、系统之间的依赖程度 2、量(业务量,数据量,访问量)

消息中间件有些什么使用场景?

1、 异步处理
用户注册(50ms),还需发送邮件(50ms)和短信(50ms)
串行:(150ms)用户注册—》发送邮件----》发送短信
并行(100ms):用户注册—》发送邮件
|----》发送短信

消息中间件(56ms):
用户注册(50ms)—》(6ms)消息中间件《-----发送邮件
《-----发送短信

2、 应用的解耦
订单系统---》库存系统(强耦合)
消息中间件:订单系统---》消息中间件《----库存系统(解耦)

3、 流量的削峰
用户请求-----》秒杀应用
应用的前端加入消息队列
用户请求-----》消息队列《----秒杀应用

4、 日志处理
错误日志---》消息队列《----日志处理
用户行为日志--》消息队列(kafka)《-----日志的存储或流式处理

5、纯粹的消息通信

常见消息中间件比较
1.jpg

kafka****和RabbitMQ****的比较

1、 RabbitMq比kafka成熟,在可用性上,稳定性上,可靠性上,RabbitMq超过kafka
2、 Kafka设计的初衷就是处理日志的,可以看做是一个日志系统,针对性很强,所以它并没有具备一个成熟MQ应该具备的特性
3、 Kafka的性能(吞吐量、tps)比RabbitMq要强

什么是JMS规范

本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

JMS对象模型包含如下几个要素:

1)连接工厂:创建一个JMs连接
2)JMS连接:客户端和服务器之间的一个连接。
3)JMS会话:客户和服务器会话的状态,建立在连接之上的
4)JMS目的:消息队列
5)JMS生产者:消息的生成
6)JMS消费者:接收消息
7)Broker:消息中间件的实例(ActiveMq)

JMS规范中的点对点模式:

队列,一个消息只有一个消费者(即使有多个接受者监听队列),消费者是要向队列应答成功

JMS规范中的主题模式(发布订阅):

发布到Topic的消息会被当前主题所有的订阅者消费

JMS规范中的消息类型

TextMessage,MapMessage,ObjectMessage,BytesMessage,StreamMessage

代码:不配置Spring
pom.xml

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.8.0</version>
        </dependency>

Producer

public class JmsProducer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer messageProducer;

        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            connection = connectionFactory.createConnection();
            connection.start();
            /*
            createSession参数取值
            * 1、为true表示启用事务
            * 2、消息的确认模式
            * AUTO_ACKNOWLEDGE  自动签收
            * CLIENT_ACKNOWLEDGE 客户端自行调用acknowledge方法签收
            * DUPS_OK_ACKNOWLEDGE 不是必须签收,消费可能会重复发送
            * 在第二次重新传送消息的时候,消息
               头的JmsDelivered会被置为true标示当前消息已经传送过一次,
               客户端需要进行消息的重复处理控制。
            * */
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("HelloWAM");
            messageProducer = session.createProducer(destination);
            for(int i=0;i<SENDNUM;i++){
                String msg = "发送消息"+i+" "+System.currentTimeMillis();
                TextMessage message = session.createTextMessage(msg);
                System.out.println("发送消息:"+msg);
                messageProducer.send(message);
            }
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

启动生产者后台发送10条消息到队列


2.png 3.jpg

Consumer

public class JmsConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
                JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue("HelloWAM");

            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            //读取消息
            while(true){
                TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
                if(textMessage != null){
                    System.out.println("Accept msg : "+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

启动消费者


4.jpg 5.jpg

集成Spring,生产者部分
pom.xml

<!--ActiveMq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.3.11.RELEASE</version>
    </dependency>

Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
        <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!--以上增加activemq的命名空间-->

<!-- 配置扫描路径 -->
<context:component-scan base-package="com.xxx">
    <context:exclude-filter type="annotation"
                            expression="org.springframework.stereotype.Controller"/>
</context:component-scan>

<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
                       brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />

<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>

<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 队列模式-->
    <property name="pubSubDomain" value="false"></property>
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 发布订阅模式-->
    <property name="pubSubDomain" value="true"></property>
</bean>

<!--Spring JmsTemplate 的消息生产者 end-->
</beans>  

QueueSender

@Component("queueSender")
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    @Autowired
    private GetResponse getResponse;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                return msg;
            }
        });
    }
}

TopicSender

@Component("topicSender")
public class TopicSender {

    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                return msg;
            }
        });
    }
}

消费者部分
配置文件

<!-- 配置扫描路径 -->
     <context:component-scan base-package="com.dongnaoedu">
        <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

    <!-- ActiveMQ 连接工厂 -->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 消息消费者 start-->

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
        <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
    </jms:listener-container>


    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
        <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
    </jms:listener-container>

    <!-- 消息消费者 end -->

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 队列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>

</beans>

QueueReceiver1

@Component
public class QueueReceiver1 implements MessageListener {
    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver1 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

QueueReceiver2

@Component
public class QueueReceiver2 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver2 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

TopicReceiver1

@Component
public class TopicReceiver1 implements MessageListener {


    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("TopicReceiver1 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
}

TopicReceiver2

@Component
public class TopicReceiver2 implements MessageListener {
    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("TopicReceiver2 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

启动Producer,发送Queue消息,启动Consumer,可看到点对点模式,有两个消费者,一条消息,只有一个消费者能接收到消息,是轮询方式。


6.jpg
7.jpg

主题订阅模式,可看到有两个消费者,一条消息,两个消费者都能接收到消息。


8.jpg
9.jpg

相关文章

  • ActiveMQ(二)ActiveMQ集群部署与应用

    本文主要讲解ActiveMQ的集群部署及主从切换演示,单机版部署及入门请移步ActiveMQ(一)单机部署与应用,...

  • 入门及ActiveMQ

    消息中间件的定义: 没有标准定义,一般认为,采用消息传送机制/****消息队列** **的中间件技术,进行数据交流...

  • ActiveMQ从入门到精通(一)

    这是关于消息中间件ActiveMQ的一个系列专题文章,将涵盖JMS、ActiveMQ的初步入门及API详细使用、两...

  • 消息中间件之ActiveMQ学习笔记

    目录 JMS介绍ActiveMQ简介及安装ActiveMQ的实例ActiveMQ配置介绍ActiveMQ的部署模式...

  • java消息服务之ActiveMQ入门(0x01)

    java消息服务之ActiveMQ入门(0x02) ActiveMQ能做什么 大多数情况下ActiveMQ被用于做...

  • java消息服务之ActiveMQ入门(0x02)

    java消息服务之ActiveMQ入门(0x01) 概要 ActiveMQ 的 request-response通...

  • ActiveMQ学习

    MQ入门总结(三)ActiveMQ的用法和实现

  • ActiveMQ入门

    ActiveMQ安装 点对点模式 发送消息 接收消息 第一种方式: 第二种方式: 订阅模式 发送消息 消息接收 S...

  • 【ActiveMQ】入门

    1. ActiveMQ 是什么 ActiveMQ 是 Apache 出品,能力强劲的开源消息总线,虽然现在没那么流...

  • ActiveMQ入门

    JMS是什么 JMS是SUN提出是为了统一MOM系统接口的规范,他包含点对点,以及发布订阅两种消息模型,提供可靠消...

网友评论

      本文标题:入门及ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/lbxfrktx.html