美文网首页
activemq集群zookeeper,消息过期

activemq集群zookeeper,消息过期

作者: 周六放风筝 | 来源:发表于2018-07-14 08:53 被阅读119次

    准备环境: jdk 1.8 activemq5.14.3(这里5.8以上版本都可) zookeeper3.3.6 centos7或win10

    这里我只做一台 zookeeper的 如果用zk集群 可以英文逗号隔开

    话不多说 直接上配置文件 apache-activemq-5.14.3\conf\activemq.xml

    first of all :
    配置文件中配置了和zk集群 ,请详看配置文件

    secondly:
    配置文件中 配置了queue 30秒没有处理的消息自动丢弃
    topic 过期没有配置

    
    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- Allows us to use system properties as variables in this configuration file -->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
       <!-- Allows accessing the server log -->
        <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    
        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mybroker" dataDirectory="${activemq.data}">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <!--步骤一 配置queue消息超时 丢弃到死信队列 -->
                    <!-- gcInactiveDestinations="true"启用清理功能 -->
                    <!-- inactiveTimoutBeforeGC="30000" Topic或Queue超时时间  单位毫秒-->
                    <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000">  
                        <!--步骤二  设置死信策略-->
                     <deadLetterStrategy>  
                        <!-- 非持久消息保存到死信队列  processNonPersistent="true"-->
                        <!-- 过期消息不保存到死信队列   processExpired="false" -->
                        <!-- 过期时间   expiration="30000" 毫秒-->
                        <sharedDeadLetterStrategy processExpired="false" expiration="30000" />  
                    </deadLetterStrategy>  
                  </policyEntry>  
    
                    <policyEntry topic=">" >
                    
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                      <!-- 定时丢弃死信队列 -->
                      <!-- <deadLetterStrategy>  
                        <sharedDeadLetterStrategy processExpired="true" expiration="30000"/>  
                      </deadLetterStrategy>  -->
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
          
           <persistenceAdapter>
            <!--  kahaDB directory="${activemq.data}/kahadb"/>-->
              <!-- LevelDB -->
            <!--一旦Broker崩溃,重建索引的速度会非常慢
             但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。 -->
             <!-- 参数说明 -->
             <!-- 
             replicas:集群activemq 数量  如果是三台机器这里写3 ,当集群中一台机器挂掉之后 mq会判断当前有几台可用,并满足公式 N/2>2 才会正常提供服务 
              bind:这里是mq提供的地址 端口自定义  三台机器里面的配置要一致,tips:不一致我还没有试过
              zkAddress:zk服务提供地址  zk集群的话 多台 用 英文逗号隔开
              zkPassword:zk 的acl认证时候的密码
              hostname:可理解为本机机器名称。 三台机器都不一致。
              zkPath:zk上的存储节点位置,自定义 三台须一致
            -->
              <replicatedLevelDB
                directory="${activemq.data}/leveldb"
                replicas="1"
                bind="tcp://0.0.0.0:61619"
                zkAddress="127.0.0.1:2181"
                zkPassword=""
    
                hostname="localhost"
                zkPath="/activemq/leveldb-stores"
                /> 
            </persistenceAdapter>
    
             
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                   <!--  
                   一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,NON_PERSISTENT Message就会被转储到 temp store区域。虽然我们说过NON_PERSISTENT Message不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时NON_PERSISTENT Message写入到磁盘的临时区域——tempessage大量堆积致使内存耗尽的情况出现,还是会将NON_PERSISTENT Message写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的“可用磁盘空间限制”。 
                    -->
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
            <!-- 如果有默认的,activemq启动 这里的端口都需要改动  -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
            <!-- 步骤三  配置插件防止消息堆积 配置 插件 -->
            <plugins>  
                <!--timeStampingBrokerPlugin 它使用代理时间戳更新消息上的JMS客户端时间戳 -->
                <!-- 86,400,000ms = 1 day  futureOnly=true  :实时更新设置的时间 默认false  60秒 -->
                  <!--ttlCeiling:当不为零时,此值(以毫秒为单位)限制到期时间  -->
                  <!--zeroExpirationOverride: 当不为零时,此值(以毫秒为单位)将覆盖尚未设置到期日的消息的到期时间  -->
                <timeStampingBrokerPlugin ttlCeiling="30000"      zeroExpirationOverride="30000" />  
                <!-- 丢弃所有死信队列 插件    <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> -->
                <!-- 丢死指定死信队列   <discardingDLQBrokerPlugin dropOnly="MY.ORDER.QUEUE" reportInterval="1000" />-->
               <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
             </plugins> 
            
    
        </broker>
    
        <!--
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml file
    
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml"/>
    
    </beans>
    
    

    tips:levelDB官方已经不推荐 O(∩_∩)O

    相关文章

      网友评论

          本文标题:activemq集群zookeeper,消息过期

          本文链接:https://www.haomeiwen.com/subject/tvycpftx.html