1 Message Properties(消息属性)
1.1 可访问属性
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
JMSDestination |
javax.jms.Destination |
由生产者设定 | 生产者使用的目的地。 |
JMSReplyTo |
javax.jms.Destination |
null |
用户自定义。 |
JMSType |
String |
"" |
用户自定义。 |
JMSDeliveryMode |
int |
DeliveryMode.PERSISTENT |
指示是否应保留消息。 |
JMSPriority |
int |
4 |
来自的价值0-9 。 |
JMSMessageID |
String |
unique |
消息的唯一标识符。 |
JMSTimestamp |
long |
消息发送时间 | 时间(以毫秒为单位)。 |
JMSCorrelationID |
String |
null |
用户自定义。 |
JMSExpiration |
long |
0 |
消息过期的时间(以毫秒为单位)。的值0 意味着永远不会过期。 |
JMSRedelivered |
boolean |
false |
true 如果消息正在重新发送给使用者,则通过 |
1.2 JMS定义属性
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
JMSXDeliveryCount |
int |
0 |
尝试发送消息的次数。 |
JMSXGroupID |
String |
null |
消息组的标识。 |
JMSXGroupSeq |
int |
0 |
消息的序列号。 |
JMSXProducerTXID |
String |
null |
交易识别码。 |
1.3 ActiveMQ定义属性
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
JMSActiveMQBrokerInTime |
long |
0 |
消息到达代理的时间戳(毫秒)。 |
JMSActiveMQBrokerOutTime |
long |
0 |
消息离开代理的时间戳(以毫秒为单位)。 |
2 Advisory Message(通知消息)
Advisory Message
实现了ActiveMQ
的broker
上各种操作的记录跟踪和通知。
使用这个功能,你可以实时的知道broker上
- 创建或销毁了连接。
- 添加或删除了生存者或消费者。
- 添加或删除了主题或队列。
- 有消息发送和接收。
- 什么时候有慢消费者。
- 什么时候有快生产者。
- 什么时候什么消息被丢弃。
- 什么时候broker被添加到集群(主从或是网络连接)。
这个机制是ActiveMQ对JMS协议的重要补充,也是基于JMS实现的ActiveMQ的可管理性的一部分。多个ActiveMQ的相互协调和互操作的基础设置。
开启通知消息后,可以订阅前缀为ActiveMQ.Advisory.
的Topic。具体的通知消息请参考官网ActiveMQ。示例如下:
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
public void onMessage(Message msg){
if(msg instanceof ActiveMQMessage) {
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
}
catch(JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}
说明:
ActiveMQ.Advisory.Producer.Queue
数据结构为ProducerInfo,其中包含Producer的参数信息。ActiveMQ.Advisory.Consumer.Queue
数据结构为ConsumerInfo,其中包含Consumer的参数信息。
2.1 打开通知消息
<destinationPolicy>
<policyMap><policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" />
</policyEntries></policyMap>
</destinationPolicy>
2.2 禁用通知消息
将broker
上的advisorySupport
设置false以后,所有的Advisory都不可用。
<broker advisorySupport="false">
3 Delay and schedule Message Delivery (延迟和定时消息传递)
ActiveMQ支持延时投递和定时投递。只需要把几个描述消息定时调度的方式参数作为属性添加到消息,broker
端的调度器就会按照我们想要的行为去处理消息。当然需要再xml broker中配置schedulerSupport属性为true。有如下属性:
-
AMQ_SCHEDULED_DELAY: 延迟投递的时间。
-
AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔。
-
AMQ_SCHEDULED_REPEAT:重复投递次数。
-
AMQ_SCHEDULED_CRON: Cron表述。
示例:
TextMessage textMessage = session.createTextMessage("message" + i);
long delay = 10 * 1000;
long period= 5 * 1000;
int repeat = 3;
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
messageProducer.send(textMessage);
使用CRON表达式,每个小时发送一次:
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 * * * *");
CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果。
4 Message Transformation(消息转换)
从4.2版本起,ActiveMQ提供了一个Message Transformer
接口用于进行消息转换:
- 在将消息通过
provider
发送到消息总线之前对其进行充实或转换。 - 从消息总线接收分发给
consumer
之前,对消息进行丰富或转换。
例如你可以通过XStreamMessageTransformer将ObjectMessage转换成XML格式的TextMessage,可以通过setTransformer方法将其设置在以下类之一:
- ActiveMQConnectionFactory
- ActiveMQConnection。
- ActiveMQMessageConsumer
- ActiveMQMessageProducer
网友评论