美文网首页
消息中间件-activemq消息机制和持久化介绍

消息中间件-activemq消息机制和持久化介绍

作者: Lemonrel | 来源:发表于2019-12-28 00:34 被阅读0次

    我们知道activemq的使用方式非常简单有如下几个步骤:
    1.创建连接工厂
    2.创建连接
    3.创建会话
    4.创建目的地
    5.创建生产者或消费者
    6.生产或消费消息
    7.关闭生产或消费者、关闭会话、关闭连接
    这一节我们针对他的消息传播机制和持久化方式做一个简单的学习。在会用的同时我们也需要理解一些基本的概念,这样才不至于在出错后无从下手。
    1.activemq服务器工作模型
    我们先看一下消息发送的时序图:



    ConnectionFactory 对象创建一个连接工厂,消息的发送和接受服务均由此进行;
    ConnectionFactory 创建一个活动Connection作为当前使用的连接;
    Session 是一个用于生成和使用消息的单线程上下文,它用于创建发送的生产者和接收消息的消费者,并为所发送的消息定义发送顺序。会话通过大量确认选项或通过事务来支持可靠传送。
    户端使用 MessageProducer 向指定的物理目标发送消息,生产者可指定一个默认传送模式(持久性消息与非持久性消息)、优先级和有效期值,以控制生产者向物理目标发送的所有消息;
    消费者可以支持同步或异步消息接收。异步使用可通过向消费者注册 MessageListener 来实现。当会话线程调用 MessageListener 对象的 onMessage 方法时,客户端将使用消息。

    2.ActiveMQ消息传送模型
    ActiveMQ 支持两种消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布 /订阅模型),前面我们已经讲过,在此就不赘述。

    3.消息选择器
    ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。
    消息选择器是根据 header 和 properties 允许客户端选择性的制定需要接收的消息,消息选择器是无法利用 消息主体(Body)进行过滤的。无论你的消息主题是什么类型, 文本、或者对象、或者键值对。下面我们讲一下消息选择器的语法以及使用规范:
    可接收的类型包括:byte,int,double,boolean,String;
    属性标识符定义:
    1)变量名与java定义一样;
    2)要么在heads中定义 要么在 properties中定义,如果在sender中是在heads中定义而receiver中却从properties中寻找的话,找不到的情况下他是不会自动去heads中寻找的,而是会返回null;
    3)根据不同类型的变量选择不同的方法:
    message.setIntProperty("test",14);
    4)那么在接收端可以对该变量进行拦截:
    session.createConsumer(destination,"test > 14");
    属性标志符是区分大小写的;
    拦截器中的部分表示方式:
    1)可以是条件表达式
    2)可以是算术表达式
    3)可以是比较运算和逻辑运算组成的表达式

    支持 () 左右括号;
    支持逻辑运算的优先顺序表达式 例如: NOT , AND , OR;
    比较运算符有: = , > , >= , < , <= , <> (not equal);
    eg:

    标识符是null
    "prop_name IS NULL"
    标识符非空 not null
    "prop_name IS NOT NULL"
    "age BETWEEN 15 AND 19" is equivalent to "age >= 15 AND age <= 19"
    "Country NOT IN (' UK', 'US', 'France') "
    

    代码很简单,只需要在Sender端做如下改写:

    TextMessage message = session.createTextMessage();
    message.setIntProperty("test",14);
    message.setText("test");
    

    Receiver端:

    consumer = session.createConsumer(destination,"test > 14");
    

    对发送端的特定字符做一个判断符合条件即被拦截

    4.消息确认机制
    jms消息只有在被确认之后才认为成功消费了这条消息。消息的成功消费通常包括三个步骤:
    (1)client接收消息
    (2)client处理消息
    (3)消息被确认(也就是client给一个确认消息)
    在事务性会话中当一个事务被提交的时候,确认自动发生,和应答模式没关系,这个值可以随便写。(这里多提一句异步消息接收中不能使用事务性会话)。
    在非事务性会话中消息何时被确认取决于创建的session中设置的消息应答模式(acknowledge model)该参数有三个值:
    1)Session.AUTO_ACKNOWLEDGE:当client端成功的从receive方法或从onMessage(Message message) 方法返回的时候,会话自动确认client收到消息。
    2)Session.CLIENT_ACKNOWLEDGE: 客户单通过调用acknowledge方法来确认客户端收到消息。但需要注意在这种应答模式下,确认是在会话层上进行的,确认一个被消费的消息将自动确认所有已消费的其他消息。比如一个消费者已经消费了10条消息,然后确认了第5条消息被消费,则这10条都被确认消费了。、
    acknowledge()通知方法是在Message对象上,同步接收,调用acknowledge()方法进行确认如下所示:

    consumer = session.createConsumer(queue);  
    Message message = consumer.receive();  
    message.acknowledge(); 
    

    异步接受,调用acknowledge()方法进行确认:

    consumer.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String value = textMessage.getText();
                System.out.println("value: " + value);
                message.acknowledge(); //消息消费确认通知
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    

    3)Session.DUPS_ACKNOWLEDGE:不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

    1. 持久化消息
      JMS 支持以下两种消息提交模式:
      5.1 ERSISTENT 持久消息
      是activemq默认的传送方式,此方式下的消息在配合activemq.xml中配置的消息存储方式,会被存储在特定的地方,直到有消费者将消息消费或者消息过期进入DLQ队列,消息生命周期才会结束。此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。如果消息消费者在进行消费过程发生失败,则消息会被再次投递。
      DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 消息持久化在硬盘中,ActiveMQ持久化有三种方式:AMQ、KahaDB、JDBC。
      AMQ
      AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
      KahaDB
      KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。
      JDBC
      可以将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2。
      具体使用方式大家下去查一下,限于篇幅在此就不做太详细的介绍。
      5.2 NON_PERSISTENT 非持久消息
      非持久的消息适用于不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。
      DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在内存中,读写速度快,在JMS服务停止后消息会消失,没有持久化到硬盘。

    2. ActiveMQ消息过期设置
      允许消息过期 。默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。
      有两种方法设置消息的过期时间,时间单位为毫秒:
      1)使用 setTimeToLive 方法为所有的消息设置过期时间;
      2)使用 send 方法为每一条消息设置过期时间。
      消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
      文章来源于网络。
      感谢大家阅读,欢迎大家私信讨论。给大家推荐一个Java技术交流群:473984645里面会分享一些资深架构师录制的视频资料:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多!
      推荐大家阅读:
      Java高级架构学习资料分享+架构师成长之路​
      个人整理了更多资料以PDF文件的形式分享给大家,需要查阅的程序员朋友可以来免费领取。还有我的学习笔记PDF文件也免费分享给有需要朋友!

    相关文章

      网友评论

          本文标题:消息中间件-activemq消息机制和持久化介绍

          本文链接:https://www.haomeiwen.com/subject/vcvjoctx.html