美文网首页
ActiveMQ(六)消息的持久化订阅

ActiveMQ(六)消息的持久化订阅

作者: 慕容鸿煊 | 来源:发表于2019-03-24 17:22 被阅读0次

分别运行订阅模式和P2P模式,可以发现,P2P模式缺省把消息进行持久化,而topic模式是没有的。
一般topic模式实验:
1、 启动两个消费者,启动一个生产者,发送消息,两个消费者都可以收到。
2、 关闭一个消费者,生产者发送消息,活跃的消费者可以收到消息,启动被关闭的消费者,无法收到消息。
3、 关闭所有消费者,生产者发送消息,在ActiveMQ控制台可以看见消息已被接收,关闭再启动ActiveMQ,启动消费者收不到消息。
如果topic模式下,需要消费者在离线又上线后,不管ActiveMQ是否重启过,都保证可以接受到消息,就需要进行持久化订阅。具体代码参见模块no-spirng包durabletopic。
持久Topic消费者端
需要设置客户端id:connection.setClientID("Mark");
消息的destination变为 Topic
消费者类型变为TopicSubscriber
消费者创建时变为session.createDurableSubscriber(destination,"任意名字,代表订阅名 ");
运行一次消费者,将消费者在ActiveMQ上进行一次注册。然后在ActiveMQ的管理控制台subscribers页面可以看见我们的消费者。
效果:
1、 运行生产者,发布消息,多个消费者可以正常收到。
2、 关闭一个消费者,运行生产者,发布消息后再启动被关闭的消费者,可以收到离线后的消息;
3、 关闭所有消费者,运行生产者,发布消息后,关闭ActiveMQ再启动,启动所有消费者,都可以收到消息。
注意:生产者端无需另外单独配置
消息非持久化
修改messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);来设置消息本身的持久化属性为非持久化。重复上述实验,可以发现,第1,2点保持不变,但是第三点,当关闭ActiveMQ再启动,消费者关闭后再启动,是收不到消息的。
说明,即使进行了持久订阅,但是消息本身如果是不持久化的,ActiveMQ关闭再启动,这些非持久化的消息会丢失,进行持久订阅的消费者也是收不到自身离线期间的消息的。

代码

public class JmsDurableTopicConsumerOther {

    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程

        TopicSubscriber  messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JmsDurableTopicConsumerOther.USERNAME,
                JmsDurableTopicConsumerOther.PASSWORD, JmsDurableTopicConsumerOther.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            connection.setClientID("Lison");
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("DurableTopic2");

            //创建消息消费者
            messageConsumer = session.createDurableSubscriber(destination,"xiangxue");
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        System.out.println("Accept msg : "
                                +((TextMessage)message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });


        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

相关文章

  • ActiveMQ(六)消息的持久化订阅

    分别运行订阅模式和P2P模式,可以发现,P2P模式缺省把消息进行持久化,而topic模式是没有的。一般topic模...

  • ActiveMQ发送消息原理解析

    本文将对ActiveMQ发送消息的源码进行解析,并分析ActiveMQ持久化消息和非持久化消息的发送策略和消息的存...

  • activeMQ-12消息持久化机制

    首先之前说的事务、消息持久化、签收 这些功能,都是属于activeMQ自身携带的功能,那么是什么支撑的她消息持久化...

  • ActiveMq的发送原理

    持久化消息和非持久化消息的发送策略:消息同步发送和异步发送ActiveMQ支持同步、异步两种发送模式将消息发送到b...

  • ActiveMQ消息数据持久化

    一、 问题提出 场景问题:服务器断电重启,未被消费的消息是否会在重启之后被继续消费?两种选择:非持久性模式/持久性...

  • ActiveMq的消息存储持久化

    转载自 https://www.cnblogs.com/xinhuaxuan/p/6128380.html Act...

  • ActiveMQ的消息持久化实现

    字典里最重要的三个词,就是意志、工作、等待。我将要在这三块基石上建立我成功的金字塔。 —— 巴斯德 消息持久化方式...

  • activeMQ-06消息持久化

    activeMQ消息的持久化:分为两部分说,queue队列部分、topic主题部分; queue队列首先需要设计生...

  • ActiveMQ的非持久化订阅的思考

    项目中用到了大量的client,由于client和AMQ相连,AMQ对大量queue的支持不是很好,所以在项目中用...

  • ActiveMQ(四)消息存储的持久化

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,怎么办,可以进行消...

网友评论

      本文标题:ActiveMQ(六)消息的持久化订阅

      本文链接:https://www.haomeiwen.com/subject/wwwuvqtx.html