美文网首页
入门及ActiveMQ

入门及ActiveMQ

作者: 庙人 | 来源:发表于2020-07-28 14:37 被阅读0次
    消息中间件的定义:

    没有标准定义,一般认为,采用消息传送机制/****消息队列** **的中间件技术,进行数据交流,用在分布式系统的集成

    为什么要用消息中间件?

    解决分布式系统之间消息的传递。
    电商场景:用户下单减库存,调用物流系统,系统扩充后服务化和业务拆分。系统交互,y一般用RPC(远程过程调用)。如果系统扩充到有几十个接口,消息中间件来解决问题。

    和RPC有何区别?

    使用区分标准:1、系统之间的依赖程度 2、量(业务量,数据量,访问量)

    消息中间件有些什么使用场景?

    1、 异步处理
    用户注册(50ms),还需发送邮件(50ms)和短信(50ms)
    串行:(150ms)用户注册—》发送邮件----》发送短信
    并行(100ms):用户注册—》发送邮件
    |----》发送短信

    消息中间件(56ms):
    用户注册(50ms)—》(6ms)消息中间件《-----发送邮件
    《-----发送短信

    2、 应用的解耦
    订单系统---》库存系统(强耦合)
    消息中间件:订单系统---》消息中间件《----库存系统(解耦)

    3、 流量的削峰
    用户请求-----》秒杀应用
    应用的前端加入消息队列
    用户请求-----》消息队列《----秒杀应用

    4、 日志处理
    错误日志---》消息队列《----日志处理
    用户行为日志--》消息队列(kafka)《-----日志的存储或流式处理

    5、纯粹的消息通信

    常见消息中间件比较
    1.jpg

    kafka****和RabbitMQ****的比较

    1、 RabbitMq比kafka成熟,在可用性上,稳定性上,可靠性上,RabbitMq超过kafka
    2、 Kafka设计的初衷就是处理日志的,可以看做是一个日志系统,针对性很强,所以它并没有具备一个成熟MQ应该具备的特性
    3、 Kafka的性能(吞吐量、tps)比RabbitMq要强

    什么是JMS规范

    本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

    JMS对象模型包含如下几个要素:

    1)连接工厂:创建一个JMs连接
    2)JMS连接:客户端和服务器之间的一个连接。
    3)JMS会话:客户和服务器会话的状态,建立在连接之上的
    4)JMS目的:消息队列
    5)JMS生产者:消息的生成
    6)JMS消费者:接收消息
    7)Broker:消息中间件的实例(ActiveMq)

    JMS规范中的点对点模式:

    队列,一个消息只有一个消费者(即使有多个接受者监听队列),消费者是要向队列应答成功

    JMS规范中的主题模式(发布订阅):

    发布到Topic的消息会被当前主题所有的订阅者消费

    JMS规范中的消息类型

    TextMessage,MapMessage,ObjectMessage,BytesMessage,StreamMessage

    代码:不配置Spring
    pom.xml

    <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.8.0</version>
            </dependency>
    

    Producer

    public class JmsProducer {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        //发送的消息数量
        private static final int SENDNUM = 10;
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;
            Connection connection = null;
            Session session;
            Destination destination;
            MessageProducer messageProducer;
    
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
    
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                /*
                createSession参数取值
                * 1、为true表示启用事务
                * 2、消息的确认模式
                * AUTO_ACKNOWLEDGE  自动签收
                * CLIENT_ACKNOWLEDGE 客户端自行调用acknowledge方法签收
                * DUPS_OK_ACKNOWLEDGE 不是必须签收,消费可能会重复发送
                * 在第二次重新传送消息的时候,消息
                   头的JmsDelivered会被置为true标示当前消息已经传送过一次,
                   客户端需要进行消息的重复处理控制。
                * */
                session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("HelloWAM");
                messageProducer = session.createProducer(destination);
                for(int i=0;i<SENDNUM;i++){
                    String msg = "发送消息"+i+" "+System.currentTimeMillis();
                    TextMessage message = session.createTextMessage(msg);
                    System.out.println("发送消息:"+msg);
                    messageProducer.send(message);
                }
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    

    启动生产者后台发送10条消息到队列


    2.png 3.jpg

    Consumer

    public class JmsConsumer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;//连接工厂
            Connection connection = null;//连接
    
            Session session;//会话 接受或者发送消息的线程
            Destination destination;//消息的目的地
    
            MessageConsumer messageConsumer;//消息的消费者
    
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
                    JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接HelloWorld的消息队列
                destination = session.createQueue("HelloWAM");
    
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
    
                //读取消息
                while(true){
                    TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
                    if(textMessage != null){
                        System.out.println("Accept msg : "+textMessage.getText());
                    }else{
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    启动消费者


    4.jpg 5.jpg

    集成Spring,生产者部分
    pom.xml

    <!--ActiveMq-->
        <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jms</artifactId>
          <version>4.3.11.RELEASE</version>
        </dependency>
    

    Spring配置文件

    <?xml version="1.0" encoding="UTF-8"?>
            <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:amq="http://activemq.apache.org/schema/core"
           xmlns:jms="http://www.springframework.org/schema/jms"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    <!--以上增加activemq的命名空间-->
    
    <!-- 配置扫描路径 -->
    <context:component-scan base-package="com.xxx">
        <context:exclude-filter type="annotation"
                                expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>
    
    <!-- ActiveMQ 连接工厂 -->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
    
    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100"></property>
    </bean>
    
    <!-- Spring JmsTemplate 的消息生产者 start-->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 队列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>
    
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 发布订阅模式-->
        <property name="pubSubDomain" value="true"></property>
    </bean>
    
    <!--Spring JmsTemplate 的消息生产者 end-->
    </beans>  
    

    QueueSender

    @Component("queueSender")
    public class QueueSender {
    
        @Autowired
        @Qualifier("jmsQueueTemplate")
        private JmsTemplate jmsTemplate;
    
        @Autowired
        private GetResponse getResponse;
    
        public void send(String queueName,final String message){
            jmsTemplate.send(queueName, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    Message msg = session.createTextMessage(message);
                    return msg;
                }
            });
        }
    }
    

    TopicSender

    @Component("topicSender")
    public class TopicSender {
    
        @Autowired
        @Qualifier("jmsTopicTemplate")
        private JmsTemplate jmsTemplate;
    
        public void send(String queueName,final String message){
            jmsTemplate.send(queueName, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    Message msg = session.createTextMessage(message);
                    return msg;
                }
            });
        }
    }
    

    消费者部分
    配置文件

    <!-- 配置扫描路径 -->
         <context:component-scan base-package="com.dongnaoedu">
            <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
         </context:component-scan>
    
        <!-- ActiveMQ 连接工厂 -->
        <amq:connectionFactory id="amqConnectionFactory"
                               brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  />
    
        <!-- Spring Caching连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
            <property name="sessionCacheSize" value="100" />
        </bean>
    
        <!-- 消息消费者 start-->
    
        <!-- 定义Queue监听器 -->
        <jms:listener-container destination-type="queue" container-type="default"
                                connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
            <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
        </jms:listener-container>
    
    
        <!-- 定义Topic监听器 -->
        <jms:listener-container destination-type="topic" container-type="default"
                                connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
            <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
        </jms:listener-container>
    
        <!-- 消息消费者 end -->
    
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <constructor-arg ref="connectionFactory"></constructor-arg>
            <!-- 队列模式-->
            <property name="pubSubDomain" value="false"></property>
        </bean>
    
    </beans>
    

    QueueReceiver1

    @Component
    public class QueueReceiver1 implements MessageListener {
        public void onMessage(Message message) {
            try {
                String textMsg = ((TextMessage)message).getText();
                System.out.println("QueueReceiver1 accept msg : "+textMsg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }
    

    QueueReceiver2

    @Component
    public class QueueReceiver2 implements MessageListener {
    
        public void onMessage(Message message) {
            try {
                String textMsg = ((TextMessage)message).getText();
                System.out.println("QueueReceiver2 accept msg : "+textMsg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    TopicReceiver1

    @Component
    public class TopicReceiver1 implements MessageListener {
    
    
        public void onMessage(Message message) {
            try {
                String textMsg = ((TextMessage)message).getText();
                System.out.println("TopicReceiver1 accept msg : "+textMsg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        
    }
    

    TopicReceiver2

    @Component
    public class TopicReceiver2 implements MessageListener {
        public void onMessage(Message message) {
            try {
                String textMsg = ((TextMessage)message).getText();
                System.out.println("TopicReceiver2 accept msg : "+textMsg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    启动Producer,发送Queue消息,启动Consumer,可看到点对点模式,有两个消费者,一条消息,只有一个消费者能接收到消息,是轮询方式。


    6.jpg
    7.jpg

    主题订阅模式,可看到有两个消费者,一条消息,两个消费者都能接收到消息。


    8.jpg
    9.jpg

    相关文章

      网友评论

          本文标题:入门及ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/lbxfrktx.html