美文网首页
ActiveMQ之Message

ActiveMQ之Message

作者: 爱健身的兔子 | 来源:发表于2021-01-11 09:59 被阅读0次

1 Message Properties(消息属性)

1.1 可访问属性
属性名称 类型 默认值 描述
JMSDestination javax.jms.Destination 由生产者设定 生产者使用的目的地。
JMSReplyTo javax.jms.Destination null 用户自定义。
JMSType String "" 用户自定义。
JMSDeliveryMode int DeliveryMode.PERSISTENT 指示是否应保留消息。
JMSPriority int 4 来自的价值0-9
JMSMessageID String unique 消息的唯一标识符。
JMSTimestamp long 消息发送时间 时间(以毫秒为单位)。
JMSCorrelationID String null 用户自定义。
JMSExpiration long 0 消息过期的时间(以毫秒为单位)。的值0意味着永远不会过期。
JMSRedelivered boolean false true如果消息正在重新发送给使用者,则通过
1.2 JMS定义属性
属性名称 类型 默认值 描述
JMSXDeliveryCount int 0 尝试发送消息的次数。
JMSXGroupID String null 消息组的标识。
JMSXGroupSeq int 0 消息的序列号。
JMSXProducerTXID String null 交易识别码。
1.3 ActiveMQ定义属性
属性名称 类型 默认值 描述
JMSActiveMQBrokerInTime long 0 消息到达代理的时间戳(毫秒)。
JMSActiveMQBrokerOutTime long 0 消息离开代理的时间戳(以毫秒为单位)。

2 Advisory Message(通知消息)

Advisory Message实现了ActiveMQbroker上各种操作的记录跟踪和通知。

使用这个功能,你可以实时的知道broker上

  1. 创建或销毁了连接。
  2. 添加或删除了生存者或消费者。
  3. 添加或删除了主题或队列。
  4. 有消息发送和接收。
  5. 什么时候有慢消费者。
  6. 什么时候有快生产者。
  7. 什么时候什么消息被丢弃。
  8. 什么时候broker被添加到集群(主从或是网络连接)。

这个机制是ActiveMQ对JMS协议的重要补充,也是基于JMS实现的ActiveMQ的可管理性的一部分。多个ActiveMQ的相互协调和互操作的基础设置。

开启通知消息后,可以订阅前缀为ActiveMQ.Advisory.的Topic。具体的通知消息请参考官网ActiveMQ。示例如下:

Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);

public void onMessage(Message msg){
    if(msg instanceof ActiveMQMessage) {
        try {
             ActiveMQMessage aMsg = (ActiveMQMessage)msg;
             ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
        } 
        catch(JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}

说明

ActiveMQ.Advisory.Producer.Queue数据结构为ProducerInfo,其中包含Producer的参数信息。ActiveMQ.Advisory.Consumer.Queue数据结构为ConsumerInfo,其中包含Consumer的参数信息。

2.1 打开通知消息
<destinationPolicy>
   <policyMap><policyEntries> 
      <policyEntry topic=">" advisoryForConsumed="true" />
   </policyEntries></policyMap>
</destinationPolicy>
2.2 禁用通知消息

broker上的advisorySupport设置false以后,所有的Advisory都不可用。

<broker advisorySupport="false">

3 Delay and schedule Message Delivery (延迟和定时消息传递)

ActiveMQ支持延时投递和定时投递。只需要把几个描述消息定时调度的方式参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。当然需要再xml broker中配置schedulerSupport属性为true。有如下属性:

  • AMQ_SCHEDULED_DELAY: 延迟投递的时间。

  • AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔。

  • AMQ_SCHEDULED_REPEAT:重复投递次数。

  • AMQ_SCHEDULED_CRON: Cron表述。

示例:

TextMessage textMessage = session.createTextMessage("message" + i);
            long delay = 10 * 1000;
            long period= 5 * 1000;
            int repeat = 3;
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            messageProducer.send(textMessage);

使用CRON表达式,每个小时发送一次:

textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 * * * *");

CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果。

4 Message Transformation(消息转换)

从4.2版本起,ActiveMQ提供了一个Message Transformer接口用于进行消息转换:

  • 在将消息通过provider发送到消息总线之前对其进行充实或转换。
  • 从消息总线接收分发给consumer之前,对消息进行丰富或转换。

例如你可以通过XStreamMessageTransformer将ObjectMessage转换成XML格式的TextMessage,可以通过setTransformer方法将其设置在以下类之一:

  • ActiveMQConnectionFactory
  • ActiveMQConnection。
  • ActiveMQMessageConsumer
  • ActiveMQMessageProducer

ActiveMQ

相关文章

网友评论

      本文标题:ActiveMQ之Message

      本文链接:https://www.haomeiwen.com/subject/goiraktx.html