美文网首页
JMS如何保证消息的可靠性?

JMS如何保证消息的可靠性?

作者: 蝌蚪1573 | 来源:发表于2021-09-21 10:56 被阅读0次

    JMS如何保证消息的可靠性?

    JMS是通过持久化(Persistent)、事务(Transaction)、 签收来(acknowledge)保证的

    image.png

    1、持久化

    首先消息的持久化是在消息的生产者中设置的,由消息的生产者告诉消息中间件是否进行消息的持久化,如果生产者端没有设置,默认是持久化的
    通过session创建出来的生产者生产的Queue消息为持久性

    • 持久化:当服务器宕机,消息依然存在;当服务器再次重启,消息能够被消费者继续消费。
    • 非持久化:当服务器宕机,消息不会被保留。

    设置通过session创建出来的生产者生产的Queue消息为持久性
    messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化

    生产者代码实现:

    public class JmsProducer {
    
       public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
       public static final String QUEUE_ANME="queue01";
    
       public static void main(String[] args) throws JMSException {
           //1.创建连接工厂
           ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
           //2.通过连接工厂,获得连接connection并启动访问
           Connection connection = activeMQConnectionFactory.createConnection();
           connection.start();
           //3.创建会话session
           // 两个参数:事务/签收
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           //4.创建目的地(队列或主题topic)
    //        Destination destination = session.createQueue(QUEUE_ANME);
           Queue queue = session.createQueue(QUEUE_ANME);
           //5.创建消息的生产者
           MessageProducer messageProducer = session.createProducer(queue);
    //        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化
    //        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
           //6 通过使用messageProducer生产3条消息发送到MQ的队列里面
           for (int i = 1; i <=3; i++) {
               //7创建消息
    //            TextMessage textMessage = session.createTextMessage("msg--" + i);
               TextMessage textMessage = session.createTextMessage("textMessage msg--" + i);
               //8.通过messageProducer发送给mq
               messageProducer.send(textMessage);
           }
           //9.关闭资源
           messageProducer.close();
           session.close();
           connection.close();
           System.out.println("MQ消息发布成功");
    
       }
    }
    

    消费者代码实现:

    public class JmsConsumer {
        public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
        public static final String QUEUE_ANME="queue01";
    
        public static void main(String[] args) throws JMSException, IOException {
            System.out.println("*****我是2号消费者");
    
            //1.创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
            //2.通过连接工厂,获得连接connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3.创建会话session
            // 两个参数:事务/签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(队列或主题topic)
    //        Destination destination = session.createQueue(QUEUE_ANME);
            Queue queue = session.createQueue(QUEUE_ANME);
            //5.创建消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            while (true){
                TextMessage message = (TextMessage) messageConsumer.receive(20000L);
                if(null != message){
                    System.out.println("TX消费者接收到消息:"+message.getText());
    //                message.acknowledge();
                }else {
                    break;
                }
            }
    
    //        System.in.read();//保证控制台不关闭
            messageConsumer.close();
      //      session.commit();
            session.close();
            connection.close();
    
        }
    }
    

    持久化消息是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

    可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

    2、事务

    消息生产者偏重于事务,开启事务后生产者生产的消息,只有在session执行commit后才会被提交到服务器,不然此次提交记录无效。事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。

    如果开启事务,事务优先级>签收;

    • 在事务性会话中,当一个事务被成功提交则消息被自动签收。
    • 如果事务回滚,则消息会被再次传送
    • 对于消费者而言,如果开启事务,createSession设置为 ture,消费消息需要commit,否则会出现多次消费的情况
    //这边的第一个参数true表示事务开启
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    //事务如果是true:  需要commit
    session.commit();
    

    3、签收

    消费者偏重于签收
    签收主要针对于消费者,而且一般在非事务的情况下生效,因为开启事务后,签收与否取决于事务的提交或回滚,与签收设置的方式无关。

    签收方式有4种,但平时常用的有两种:

    1. 自动签收
    Session.AUTO_ACKNOWLEDGE;//自动签收
    
    1. 手动签收
    Session.CLIENT_ACKNOWLEDGE; //手动签收
    message.acknowledge();  // 手动签收需要ack
    

    相关文章

      网友评论

          本文标题:JMS如何保证消息的可靠性?

          本文链接:https://www.haomeiwen.com/subject/ohpbgltx.html