说明
消息拓展类,源码中大部分字段的set方法可以参考 MessageDecoder.decode(java.nio.ByteBuffer, boolean, boolean, boolean)的实现
字段
private int queueId;//队列id
private int storeSize;//整个存储空间,包含TOTALSIZE,MAGICCODE,BODYCRC等
private long queueOffset;//逻辑队列长度
private int sysFlag;//参考MessageSysFlag类
private long bornTimestamp;//消息产生的时间
private SocketAddress bornHost;//消息产生的来源host
private long storeTimestamp;//存放消息的时间(不是写入mappedFile的时间)
private SocketAddress storeHost;//存放消息的host
private String msgId;//消息id
private long commitLogOffset;//物理偏移
private int bodyCRC;//CRC验证
private int reconsumeTimes;//消费次数(又称为重试的次数)
private long preparedTransactionOffset;//这个不知道有什么用,似乎不是0就是commitLogOffset,事务?
这里不展开调用的逻辑,说明:
preparedTransactionOffset暂时不知道有什么用,看代码没看明白
方法
大部分get,set不讲
parseTopicFilterType方法,判断tag是MULTI_TAG还是SINGLE_TAG
/**
* 看是 MULTI_TAG 还是 SINGLE_TAG
*/
public static TopicFilterType parseTopicFilterType(final int sysFlag) {
if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {
return TopicFilterType.MULTI_TAG;
}
return TopicFilterType.SINGLE_TAG;
}
socketAddress2ByteBuffer方法,用于将socket地址写入byteBuffer
public static TopicFilterType parseTopicFilterType(final int sysFlag) {
if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {
return TopicFilterType.MULTI_TAG;
}
return TopicFilterType.SINGLE_TAG;
}
public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
byteBuffer.putInt(inetSocketAddress.getPort());
byteBuffer.flip();
return byteBuffer;
}
问题
sysFlag,reconsumeTimes,preparedTransactionOffset在具体保存逻辑中的用处
涉及tag,重试,最后preparedTransactionOffset还不知道是干吗的
refer
http://www.jianshu.com/p/790d6bc4a1c1 reconsumeTimes意义
网友评论