美文网首页
ActiveMQ持久化方案

ActiveMQ持久化方案

作者: 爱健身的兔子 | 来源:发表于2020-12-30 14:21 被阅读0次

    1 ActiveMQ持久化模型

    1.1 PTP

    Queue的存储是很简单的,就是一个FIFO的Queue

    1.2 PUB/SUB

    对于持久化订阅主题,每一个消费者将获得一个消息的复制。

    2 ActiveMQ持久化流程

    当ActiveMQ接收到PERSISTENT Message消息后就需要借助持久化方案来完成PERSISTENT Message的存储。这个介质可以是磁盘文件系统、可以是ActiveMQ的内置数据库,还可以是某种外部提供的关系型数据库。

    注意:

    • 如上图2.1的步骤所示,所有PERSISTENT Message都要执行持久化存储操作,持久化存储操作方案的性能直接影响着整个MQ服务端的PERSISTENT Message吞吐性能。另外NON_PERSISTENT Message虽然不会进行持久化存储,但是NON_PERSISTENT Message也不是永远都只存在与内存区域。

    • Topic模式的工作队列在没有任何活动订阅者的情况下也会对PERSISTENT Message进行持久化存储。

    • 在客户端启动事务的情况下,只要使用了send方法,PERSISTENT Message就会被发送到服务端,就会进行持久化存储操作,如果没有做事务的commit只是说这些事务中的消息不会进行确认操作,不会分发到某个指定的具体队列中。

    • 如上图中被标示为2.2的操作步骤所示,在ActiveMQ设置的持久化方案完成某条消息的持久化后,会在ActiveMQ服务节点的内部发出一个“完成”信号。这是为了告诉ActiveMQ服务节点自己,是否可以进行下一步操作。但是为了加快ActiveMQ服务节点内部的处理效率,这个过程可以设置为“异步”。

    • ActiveMQ服务端只有在收到消费者端某一条消息或某一组消息的ACK标示后,会进行删除这一条或者这一组消息的操作,并空闲出相应的存储空间。

    3 ActiveMQ内存介绍

    ActiveMQ的内存配置在activemq.xml中,如下所示:

    <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    
    • systemUsage:该标记用于设置整个ActiveMQ节点在进程级别的各种“容量”的设置情况。其中可设置的属性包括:sendFailIfNoSpaceAfterTimeout,当ActiveMQ收到一条消息时,如果ActiveMQ这时已经没有多余“容量”了,那么就会等待一段时间(这里设置的毫秒数),如果超过这个等待时间ActiveMQ仍然没有可用的容量,那么就拒绝接收这条消息并在消息的发送端抛出javax.jms.ResourceAllocationException异常;sendFailIfNoSpace,当ActiveMQ收到一条消息时,如果ActiveMQ这时已经没有多余“容量”了,就直接拒绝这条消息(不用等待一段时间),并在消息的发送端抛出javax.jms.ResourceAllocationException异常。

    • memoryUsage:该子标记设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过设置的JVM maxmemory的值。其中的percentOfJvmHeap属性表示使用“百分数值”进行设置。除了这个属性以外,还可以使用limit属性进行固定容量授权,例如:limit=”1000 mb”。这些内存容量将供所有队列使用。

    • storeUsage:该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的limit属性必须要进行设置。如果使用数据库存储方案,这个属性就不会起作用了。

    • tempUsage:在ActiveMQ 5.X+ 版本中,一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,NON_PERSISTENT Message就会被转储到 temp store区域。虽然NON_PERSISTENT Message不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时NON_PERSISTENT Message大量堆积致使内存耗尽的情况出现,还是会将NON_PERSISTENT Message写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的“可用磁盘空间限制”。

    注意

    storeUsage和tempUsage并不是“最大可用空间”,而是一个阀值。

    4 ActiveMQ持久化方案

    ActiveMQ提供了一个插件式的消息存储,主要实现了如下几种:

    1. AMQ消息存储-基于文件的存储,以前默认的存储方式

    2. KahaDB消息存储-提供了容量的提升和恢复能力,现在的默认方式

    3. JDBC消息存储-消息基于JDBC存储

    4. Memory消息存储-基于内存的消息存储

    5. LevelDB消息存储,新推出的高效存储器,但官网不推荐用。

    5 KahaDB消息存储器

    KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。如下图所示:

    • db-XX.log:存储了每一条持久化消息的真正内容。这些Data log文件统一采用db-.log的格式进行命名,并且每个Data log文件默认的大小都是32M。当一个Data log文件中的所有消息全部被成功消息后,这个Data log文件会在Metadata Cache中被标记为删除,并在下个checkpoint周期进行删除操作。 虽然Data log文件占用的32M的磁盘空间,但是这些磁盘空间并没有全部使用,因为为了加快写文件的性能,Data log文件采用顺序写的方式进行操作,为了保证文件使用的扇区在物理上是连续的,所以Data log文件需要预占这些扇区*。

    • Metadata Cache:为了更快的找到某个具体消息在Data log文件中的具体位置。消息的位置索引采用BTree的结构被存储在内存中,这个内存区的大小是可以设置的。

    • Metadata Store内存中没有被处理的消息索引会以一定的周期(或者一定的数量规模)为依据,同步(checkpoint)到Metadata Store中。当然redo文件也会被更新,以便在ActiveMQ服务节点在重启后对Metadata Cache进行恢复。最后,消息同步(checkpoint)依据,可以在ActiveMQ的主配置文件中进行设置。

    5.1 kahaDB的配置

    由于在ActiveMQ V5.4+的版本中,KahaDB是默认的持久化存储方案。所以即使您不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。在activemq的安装目录下的:conf/activemq.xml中有如下配置:(默认配置)

    ......
    <broker xmlns="http://activemq.apache.org/schema/core"  brokerName="localhost" dataDirectory="${activemq.data}">
        ......
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>
        ......
    </broker>
    ......
    
    5.2 kahaDB的配置属性

    以下表格展示了KahaDB中所有的配置选项和其含义(加“*”部分为重要的配置选项):

    property name default value Comments
    *directory activemq-data 消息文件和日志的存储目录
    *indexWriteBatchSize 1000 当Metadata cache区域和Metadata store区域不同的索引数量达到这个值后,Metadata cache将会发起checkpoint同步
    *indexCacheSize 10000 内存中,索引的页大小。超过这个大小Metadata cache将会发起checkpoint同步
    *enableIndexWriteAsync false 索引是否异步写到消息文件中,将以不要设置为true
    *journalMaxFileLength 32mb 一个消息文件的大小
    *enableJournalDiskSyncs true 如果为true,保证使用同步写入的方式持久化消息到journal文件中
    *cleanupInterval 30000 清除(清除或归档)不再使用的db-*.log文件的时间周期(毫秒)。
    *checkpointInterval 5000 写入索引信息到metadata store中的时间周期(毫秒)
    ignoreMissingJournalfiles false 是否忽略丢失的journal文件。如果为false,当丢失了journal文件时,broker启动时会抛异常并关闭
    checkForCorruptJournalFiles false 检查消息文件是否损坏,true,检查发现损坏会尝试修复
    checksumJournalFiles false 产生一个checksum,以便能够检测journal文件是否损坏。
    • 5.4版本之后有效的属性:
    property name default value Comments
    *archiveDataLogs false 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
    *directoryArchive null 存储被归档的消息文件目录
    databaseLockedWaitDelay 10000 在使用负载时,等待获得文件锁的延迟时间,单位ms
    maxAsyncJobs 10000 等待写入journal文件的任务队列的最大数量。应该大于或等于最大并发producer的数量。配合并行存储转发属性使用。
    concurrentStoreAndDispatchTopics false 如果为true,转发消息的时候同时提交事务
    concurrentStoreAndDispatchQueues true 如果为true,转发Topic消息的时候同时存储消息的message store中
    • 5.6版本之后有效的属性:
    property name default value Comments
    archiveCorruptedIndex false 是否归档错误的索引到Archive文件夹下
    • 5.10版本之后有效的属性:
    property name default value Comments
    IndexDirectory 单独设置KahaDB中,db.data文件的存储位置。如果不进行设置,db.data文件的存储位置还是将以directory属性设置的值为准

    6 JDBC消息存储器

    ActiveMQ支持使用JDBC来持久化消息,我们只需要配置JDBC驱动即可,至于表结构activemq会自动帮我们建好表结构。

    6.1 JDBC消息存储器配置
    1. 拷贝mysql的驱动包到activeMQ的 \lib\optional\ 目录下。

    2. 在activemq.xml配置文件里配置数据源

     <bean id="mysql_ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
            <property name="driverClassName">
                <value>com.mysql.jdbc.Driver</value>
            </property>
            <property name="url">
                <value>jdbc:mysql://localhost:3306/activemq</value>
            </property>
            <property name="username">
                <value>root</value>
            </property>
            <property name="password">
                <value>123456</value>
            </property>
        </bean>
    
    1. 修改broker的persistenceAdapter持久化方式
     <persistenceAdapter>
                <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/>
      </persistenceAdapter>
    
    6.2 JDBC表结构
    *************************** 1\. row ***************************
           Table: activemq_acks
    Create Table: CREATE TABLE `activemq_acks` (
      `CONTAINER` varchar(250) NOT NULL,
      `SUB_DEST` varchar(250) DEFAULT NULL,
      `CLIENT_ID` varchar(250) NOT NULL,
      `SUB_NAME` varchar(250) NOT NULL,
      `SELECTOR` varchar(250) DEFAULT NULL,
      `LAST_ACKED_ID` bigint(20) DEFAULT NULL,
      `PRIORITY` bigint(20) NOT NULL DEFAULT '5',
      `XID` varchar(250) DEFAULT NULL,
      PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
      KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    *************************** 1\. row ***************************
           Table: activemq_msgs
    Create Table: CREATE TABLE `activemq_msgs` (
      `ID` bigint(20) NOT NULL,
      `CONTAINER` varchar(250) NOT NULL,
      `MSGID_PROD` varchar(250) DEFAULT NULL,
      `MSGID_SEQ` bigint(20) DEFAULT NULL,
      `EXPIRATION` bigint(20) DEFAULT NULL,
      `MSG` longblob,
      `PRIORITY` bigint(20) DEFAULT NULL,
      `XID` varchar(250) DEFAULT NULL,
      PRIMARY KEY (`ID`),
      KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
      KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
      KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
      KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
      KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    *************************** 1\. row ***************************
           Table: activemq_lock
    Create Table: CREATE TABLE `activemq_lock` (
      `ID` bigint(20) NOT NULL,
      `TIME` bigint(20) DEFAULT NULL,
      `BROKER_NAME` varchar(250) DEFAULT NULL,
      PRIMARY KEY (`ID`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    

    解释上面三张表:

    • ACTIVEMQ_MSGS:存储queue和topic的数据。

      • ID:自增主键。

      • CONTAINER:所属类型以及所在队列或者主题。

      • MSGID_PROD:生产者ID。

      • MSGID_SEQ:在同一批次消息中的序号(MSGID_PROD+MSGID_SEQ=JMS的MessageID)。

      • EXPIRATION:过期时间(从1970-01-01到现在的毫秒值)。

      • MSG:序列化之后的消息。

      • PRIORITY:优先级(0-9,值越大优先级越高)。

      • XID:暂时不知道这个有什么用。

    • ACTIVEMQ_ACKS:存储持久订阅的信息和最后一个持久订阅接收的消息ID。

      • CONTAINER:与上面消息表的CONTAINER一样。

      • SUB_DEST:子目的地,与CONTAINER一样。

      • CLIENT_ID: 链接的客户端ID,也就是我们程序:connection.setClientID("cc1"); 产生的ID。

      • SUB_NAME:持久订阅者的名称.也就是我们程序: session.createDurableSubscriber(destination, "C11"); 产生的名称

      • SELECTOR:消息选择器,consumer可以选择自己想要的消息。

      • LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID。

      • XID:暂时不知道这个有什么用。

    • ACTIVEMQ_LOCK:确保在某一时刻,只允许一个broker实例来访问数据库。

      • XID:自增的主键。

      • TIME:日期。

      • BROKER_NAME:占用数据库的brokerName。

    注意:

    当消费者消费了对了的消息的时候,队列里面的数据就会被删除。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。所以持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。

    6.3 JDBC Message Store with ActiveMQ Journal====优化版的JDBC存储

    这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。JDBC 配合其自带的 high performance journal。根据官方说法,它内置的高性能journal的工作类似于在缓存层工作,消息会优先写入到journal,后台的定时任务会每隔一段时间间隔同步到数据库。

    <persistenceFactory>
                <journalPersistenceAdapterFactory
                     journalLogFiles="4"
                     journalLogFileSize="32768" 
                     useJournal="true" 
                     useQuickJournal="true" 
                     dataSource="#mysql_ds" 
                     dataDirectory="activemq-data"/>
            </persistenceFactory>
    

    JDBC Store和JDBC Message Store with ActiveMQ Journal的区别

    1. JDBC with journal的性能优于jdbc。

    2. JDBC用于master/slave模式的数据库分享。

    3. JDBC with journal不能用于master/slave模式。

    4. 一般情况下,推荐使用jdbc with journal。

    7 Memory消息存储

    内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以你必须注意设置你的broker所在的JVM和内存限制。这种方式的持久化消息只在当前JVM内有效,当重启JVM之后会丢失持久化的消息。配置如下:只需要将 persistent 属性设为false即可。

    <broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="brokerName" dataDirectory="${activemq.data}">
    

    ActiveMQ的持久消息存储方案_u014066037的博客-CSDN博客

    相关文章

      网友评论

          本文标题:ActiveMQ持久化方案

          本文链接:https://www.haomeiwen.com/subject/zvrhoktx.html