1 ActiveMQ持久化模型
1.1 PTP
Queue的存储是很简单的,就是一个FIFO的Queue
1.2 PUB/SUB
对于持久化订阅主题,每一个消费者将获得一个消息的复制。
2 ActiveMQ持久化流程
当ActiveMQ接收到PERSISTENT Message消息后就需要借助持久化方案来完成PERSISTENT Message的存储。这个介质可以是磁盘文件系统、可以是ActiveMQ的内置数据库,还可以是某种外部提供的关系型数据库。
注意:
-
如上图2.1的步骤所示,所有PERSISTENT Message都要执行持久化存储操作,持久化存储操作方案的性能直接影响着整个MQ服务端的PERSISTENT Message吞吐性能。另外NON_PERSISTENT Message虽然不会进行持久化存储,但是NON_PERSISTENT Message也不是永远都只存在与内存区域。
-
Topic模式的工作队列在没有任何活动订阅者的情况下也会对PERSISTENT Message进行持久化存储。
-
在客户端启动事务的情况下,只要使用了send方法,PERSISTENT Message就会被发送到服务端,就会进行持久化存储操作,如果没有做事务的commit只是说这些事务中的消息不会进行确认操作,不会分发到某个指定的具体队列中。
-
如上图中被标示为2.2的操作步骤所示,在ActiveMQ设置的持久化方案完成某条消息的持久化后,会在ActiveMQ服务节点的内部发出一个“完成”信号。这是为了告诉ActiveMQ服务节点自己,是否可以进行下一步操作。但是为了加快ActiveMQ服务节点内部的处理效率,这个过程可以设置为“异步”。
-
ActiveMQ服务端只有在收到消费者端某一条消息或某一组消息的ACK标示后,会进行删除这一条或者这一组消息的操作,并空闲出相应的存储空间。
3 ActiveMQ内存介绍
ActiveMQ的内存配置在activemq.xml中,如下所示:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-
systemUsage:该标记用于设置整个ActiveMQ节点在进程级别的各种“容量”的设置情况。其中可设置的属性包括:
sendFailIfNoSpaceAfterTimeout
,当ActiveMQ收到一条消息时,如果ActiveMQ这时已经没有多余“容量”了,那么就会等待一段时间(这里设置的毫秒数),如果超过这个等待时间ActiveMQ仍然没有可用的容量,那么就拒绝接收这条消息并在消息的发送端抛出javax.jms.ResourceAllocationException异常;sendFailIfNoSpace
,当ActiveMQ收到一条消息时,如果ActiveMQ这时已经没有多余“容量”了,就直接拒绝这条消息(不用等待一段时间),并在消息的发送端抛出javax.jms.ResourceAllocationException异常。 -
memoryUsage:该子标记设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过设置的JVM maxmemory的值。其中的percentOfJvmHeap属性表示使用“百分数值”进行设置。除了这个属性以外,还可以使用
limit
属性进行固定容量授权,例如:limit=”1000 mb”。这些内存容量将供所有队列使用。 -
storeUsage:该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的
limit
属性必须要进行设置。如果使用数据库存储方案,这个属性就不会起作用了。 -
tempUsage:在ActiveMQ 5.X+ 版本中,一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,NON_PERSISTENT Message就会被转储到 temp store区域。虽然NON_PERSISTENT Message不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时NON_PERSISTENT Message大量堆积致使内存耗尽的情况出现,还是会将NON_PERSISTENT Message写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的“可用磁盘空间限制”。
注意:
storeUsage和tempUsage并不是“最大可用空间”,而是一个阀值。
4 ActiveMQ持久化方案
ActiveMQ提供了一个插件式的消息存储,主要实现了如下几种:
-
AMQ消息存储-基于文件的存储,以前默认的存储方式
-
KahaDB消息存储-提供了容量的提升和恢复能力,现在的默认方式
-
JDBC消息存储-消息基于JDBC存储
-
Memory消息存储-基于内存的消息存储
-
LevelDB消息存储,新推出的高效存储器,但官网不推荐用。
5 KahaDB消息存储器
KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。如下图所示:
-
db-XX.log:存储了每一条持久化消息的真正内容。这些Data log文件统一采用db-.log的格式进行命名,并且每个Data log文件默认的大小都是32M。当一个Data log文件中的所有消息全部被成功消息后,这个Data log文件会在Metadata Cache中被标记为删除,并在下个checkpoint周期进行删除操作。 虽然Data log文件占用的32M的磁盘空间,但是这些磁盘空间并没有全部使用,因为为了加快写文件的性能,Data log文件采用顺序写的方式进行操作,为了保证文件使用的扇区在物理上是连续的,所以Data log文件需要预占这些扇区*。
-
Metadata Cache:为了更快的找到某个具体消息在Data log文件中的具体位置。消息的位置索引采用BTree的结构被存储在内存中,这个内存区的大小是可以设置的。
-
Metadata Store:内存中没有被处理的消息索引会以一定的周期(或者一定的数量规模)为依据,同步(checkpoint)到Metadata Store中。当然redo文件也会被更新,以便在ActiveMQ服务节点在重启后对Metadata Cache进行恢复。最后,消息同步(checkpoint)依据,可以在ActiveMQ的主配置文件中进行设置。
5.1 kahaDB的配置
由于在ActiveMQ V5.4+的版本中,KahaDB是默认的持久化存储方案。所以即使您不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。在activemq的安装目录下的:conf/activemq.xml中有如下配置:(默认配置)
......
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
......
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
......
</broker>
......
5.2 kahaDB的配置属性
以下表格展示了KahaDB中所有的配置选项和其含义(加“*”部分为重要的配置选项):
property name | default value | Comments |
---|---|---|
*directory | activemq-data | 消息文件和日志的存储目录 |
*indexWriteBatchSize | 1000 | 当Metadata cache区域和Metadata store区域不同的索引数量达到这个值后,Metadata cache将会发起checkpoint同步 |
*indexCacheSize | 10000 | 内存中,索引的页大小。超过这个大小Metadata cache将会发起checkpoint同步 |
*enableIndexWriteAsync | false | 索引是否异步写到消息文件中,将以不要设置为true |
*journalMaxFileLength | 32mb | 一个消息文件的大小 |
*enableJournalDiskSyncs | true | 如果为true,保证使用同步写入的方式持久化消息到journal文件中 |
*cleanupInterval | 30000 | 清除(清除或归档)不再使用的db-*.log文件的时间周期(毫秒)。 |
*checkpointInterval | 5000 | 写入索引信息到metadata store中的时间周期(毫秒) |
ignoreMissingJournalfiles | false | 是否忽略丢失的journal文件。如果为false,当丢失了journal文件时,broker启动时会抛异常并关闭 |
checkForCorruptJournalFiles | false | 检查消息文件是否损坏,true,检查发现损坏会尝试修复 |
checksumJournalFiles | false | 产生一个checksum,以便能够检测journal文件是否损坏。 |
- 5.4版本之后有效的属性:
property name | default value | Comments |
---|---|---|
*archiveDataLogs | false | 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除 |
*directoryArchive | null | 存储被归档的消息文件目录 |
databaseLockedWaitDelay | 10000 | 在使用负载时,等待获得文件锁的延迟时间,单位ms |
maxAsyncJobs | 10000 | 等待写入journal文件的任务队列的最大数量。应该大于或等于最大并发producer的数量。配合并行存储转发属性使用。 |
concurrentStoreAndDispatchTopics | false | 如果为true,转发消息的时候同时提交事务 |
concurrentStoreAndDispatchQueues | true | 如果为true,转发Topic消息的时候同时存储消息的message store中 |
- 5.6版本之后有效的属性:
property name | default value | Comments |
---|---|---|
archiveCorruptedIndex | false | 是否归档错误的索引到Archive文件夹下 |
- 5.10版本之后有效的属性:
property name | default value | Comments |
---|---|---|
IndexDirectory | 单独设置KahaDB中,db.data文件的存储位置。如果不进行设置,db.data文件的存储位置还是将以directory属性设置的值为准 |
6 JDBC消息存储器
ActiveMQ支持使用JDBC来持久化消息,我们只需要配置JDBC驱动即可,至于表结构activemq会自动帮我们建好表结构。
6.1 JDBC消息存储器配置
-
拷贝mysql的驱动包到activeMQ的 \lib\optional\ 目录下。
-
在activemq.xml配置文件里配置数据源
<bean id="mysql_ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName">
<value>com.mysql.jdbc.Driver</value>
</property>
<property name="url">
<value>jdbc:mysql://localhost:3306/activemq</value>
</property>
<property name="username">
<value>root</value>
</property>
<property name="password">
<value>123456</value>
</property>
</bean>
- 修改broker的persistenceAdapter持久化方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/>
</persistenceAdapter>
6.2 JDBC表结构
*************************** 1\. row ***************************
Table: activemq_acks
Create Table: CREATE TABLE `activemq_acks` (
`CONTAINER` varchar(250) NOT NULL,
`SUB_DEST` varchar(250) DEFAULT NULL,
`CLIENT_ID` varchar(250) NOT NULL,
`SUB_NAME` varchar(250) NOT NULL,
`SELECTOR` varchar(250) DEFAULT NULL,
`LAST_ACKED_ID` bigint(20) DEFAULT NULL,
`PRIORITY` bigint(20) NOT NULL DEFAULT '5',
`XID` varchar(250) DEFAULT NULL,
PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
*************************** 1\. row ***************************
Table: activemq_msgs
Create Table: CREATE TABLE `activemq_msgs` (
`ID` bigint(20) NOT NULL,
`CONTAINER` varchar(250) NOT NULL,
`MSGID_PROD` varchar(250) DEFAULT NULL,
`MSGID_SEQ` bigint(20) DEFAULT NULL,
`EXPIRATION` bigint(20) DEFAULT NULL,
`MSG` longblob,
`PRIORITY` bigint(20) DEFAULT NULL,
`XID` varchar(250) DEFAULT NULL,
PRIMARY KEY (`ID`),
KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
*************************** 1\. row ***************************
Table: activemq_lock
Create Table: CREATE TABLE `activemq_lock` (
`ID` bigint(20) NOT NULL,
`TIME` bigint(20) DEFAULT NULL,
`BROKER_NAME` varchar(250) DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
解释上面三张表:
-
ACTIVEMQ_MSGS:存储queue和topic的数据。
-
ID:自增主键。
-
CONTAINER:所属类型以及所在队列或者主题。
-
MSGID_PROD:生产者ID。
-
MSGID_SEQ:在同一批次消息中的序号(MSGID_PROD+MSGID_SEQ=JMS的MessageID)。
-
EXPIRATION:过期时间(从1970-01-01到现在的毫秒值)。
-
MSG:序列化之后的消息。
-
PRIORITY:优先级(0-9,值越大优先级越高)。
-
XID:暂时不知道这个有什么用。
-
-
ACTIVEMQ_ACKS:存储持久订阅的信息和最后一个持久订阅接收的消息ID。
-
CONTAINER:与上面消息表的CONTAINER一样。
-
SUB_DEST:子目的地,与CONTAINER一样。
-
CLIENT_ID: 链接的客户端ID,也就是我们程序:connection.setClientID("cc1"); 产生的ID。
-
SUB_NAME:持久订阅者的名称.也就是我们程序: session.createDurableSubscriber(destination, "C11"); 产生的名称
-
SELECTOR:消息选择器,consumer可以选择自己想要的消息。
-
LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID。
-
XID:暂时不知道这个有什么用。
-
-
ACTIVEMQ_LOCK:确保在某一时刻,只允许一个broker实例来访问数据库。
-
XID:自增的主键。
-
TIME:日期。
-
BROKER_NAME:占用数据库的brokerName。
-
注意:
当消费者消费了对了的消息的时候,队列里面的数据就会被删除。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。所以持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。
6.3 JDBC Message Store with ActiveMQ Journal====优化版的JDBC存储
这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。JDBC 配合其自带的 high performance journal。根据官方说法,它内置的高性能journal的工作类似于在缓存层工作,消息会优先写入到journal,后台的定时任务会每隔一段时间间隔同步到数据库。
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql_ds"
dataDirectory="activemq-data"/>
</persistenceFactory>
JDBC Store和JDBC Message Store with ActiveMQ Journal的区别:
-
JDBC with journal的性能优于jdbc。
-
JDBC用于master/slave模式的数据库分享。
-
JDBC with journal不能用于master/slave模式。
-
一般情况下,推荐使用jdbc with journal。
7 Memory消息存储
内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以你必须注意设置你的broker所在的JVM和内存限制。这种方式的持久化消息只在当前JVM内有效,当重启JVM之后会丢失持久化的消息。配置如下:只需要将 persistent 属性设为false即可。
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="brokerName" dataDirectory="${activemq.data}">
网友评论