美文网首页
ActiveMQ(四)消息存储的持久化

ActiveMQ(四)消息存储的持久化

作者: 慕容鸿煊 | 来源:发表于2019-03-24 17:12 被阅读0次

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,怎么办,可以进行消息的持久化,ActiveMQ提供了几种持久化方式:

    1. AMQ消息存储-基于文件的存储方式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
    2. KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式;KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。
    3. JDBC消息存储-消息基于JDBC存储的;
    4. Memory消息存储-基于内存的消息存储,由于内存不属于持久化范畴。所以内存存储不在讨论范围内。
      KahaDB
      由于KahaDB是默认的持久化存储方案。所以即使你不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。这种情况下,KahaDB文件所在位置是你的ActiveMQ安装路径下的/data/KahaDB子目录。
      关系型数据库存储方案
      从ActiveMQ 4+版本开始,ActiveMQ就支持使用关系型数据库进行持久化存储——通过JDBC实现的数据库连接。可以使用的关系型数据库囊括了目前市面的主流数据库。
      使用JDBC的方式持久化
      1、修改配置文件conf/activemq.xml:
      将其中的这段配置:
      <persistenceAdapter>
      <kahaDB directory="${activemq.base}/data/kahadb"/>
      </persistenceAdapter>
      修改为为:
      <persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#mysql-ds "/>
      </persistenceAdapter>
      2、然后在</broker>标签后,增加数据源的配置:
      <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"/>
      <property name="username" value="root"/>
      <property name="password" value="123456"/>
      <property name="poolPreparedStatements" value="true"/>
      </bean>
      其中?relaxAutoCommit=true必须有,其他的属性根据数据库的配置自行决定。
      3、将mysql-connector-java-5.1.34-bin.jar(版本可以自行选择)放到ActiveMQ的/ lib目录下。
      4、在Mysql数据库中增加在连接字符串中设置的数据库名activemq
      5、运行后,会发现在库中增加了3个表
      activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存,主要数据库字段如下:
      container:消息的destination
      sub_dest:如果是使用static集群,这个字段会有集群其他系统的信息
      client_id:每个订阅者都必须有一个唯一的客户端id用以区分
      sub_name:订阅者名称
      selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作
      last_acked_id:记录消费过的消息的id
      activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。
      activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。主要的数据库字段如下:
      id:自增的数据库主键
      container:消息的destination
      msgid_prod:消息发送者客户端的主键
      msg_seq:是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid
      expiration:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
      msg:消息本体的java序列化对象的二进制数据
      priority:优先级,从0-9,数值越大优先级越高
      消息的持久化订阅
      分别运行订阅模式和P2P模式,可以发现,P2P模式缺省把消息进行持久化,而topic模式是没有的。
      一般topic模式实验:
      1、 启动两个消费者,启动一个生产者,发送消息,两个消费者都可以收到。
      2、 关闭一个消费者,生产者发送消息,活跃的消费者可以收到消息,启动被关闭的消费者,无法收到消息。
      3、 关闭所有消费者,生产者发送消息,在ActiveMQ控制台可以看见消息已被接收,关闭再启动ActiveMQ,启动消费者收不到消息。
      如果topic模式下,需要消费者在离线又上线后,不管ActiveMQ是否重启过,都保证可以接受到消息,就需要进行持久化订阅。具体代码参见模块no-spirng包durabletopic。
      持久Topic消费者端
      需要设置客户端id:connection.setClientID("Mark");
      消息的destination变为 Topic
      消费者类型变为TopicSubscriber
      消费者创建时变为session.createDurableSubscriber(destination,"任意名字,代表订阅名 ");
      运行一次消费者,将消费者在ActiveMQ上进行一次注册。然后在ActiveMQ的管理控制台subscribers页面可以看见我们的消费者。
      效果:
      1、 运行生产者,发布消息,多个消费者可以正常收到。
      2、 关闭一个消费者,运行生产者,发布消息后再启动被关闭的消费者,可以收到离线后的消息;
      3、 关闭所有消费者,运行生产者,发布消息后,关闭ActiveMQ再启动,启动所有消费者,都可以收到消息。
      注意:生产者端无需另外单独配置
      消息非持久化
      修改messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);来设置消息本身的持久化属性为非持久化。重复上述实验,可以发现,第1,2点保持不变,但是第三点,当关闭ActiveMQ再启动,消费者关闭后再启动,是收不到消息的。
      说明,即使进行了持久订阅,但是消息本身如果是不持久化的,ActiveMQ关闭再启动,这些非持久化的消息会丢失,进行持久订阅的消费者也是收不到自身离线期间的消息的。

    相关文章

      网友评论

          本文标题:ActiveMQ(四)消息存储的持久化

          本文链接:https://www.haomeiwen.com/subject/cdwuvqtx.html