美文网首页
RabbitMQ消息确认及消息持久化

RabbitMQ消息确认及消息持久化

作者: 裂开的汤圆 | 来源:发表于2020-11-21 20:51 被阅读0次

RabbitMQ消息确认(消费者端消息确认)

// 该代码针对消费者
autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck , deliverCallback, consumerTag -> { });

当我们设置autoAck参数为true时,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除,而不会管消费者是否正确处理该消息。当消费者在处理该消息途中挂掉,那么当前处理的消息便会丢失,并且还会丢失所有发送给该消费者且未处理的消息。

为确保消息不丢失,RabbitMQ支持消息确认。消费者消费结束后返回确认,以告知RabbitMQ已经接受并处理了该消息,RabbitMQ在收到确认信息后,可以自由的删除它。

如果RabbitMQ发送一条消息给消费者后,没有收到确认信息。RabbitMQ会有两种处理方案:

第一种,如果消费者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP链接丢失),RabbitMQ会将该消息重新排队。

第二种,消费者挂掉,但通道未关闭,RabbitMQ收不到消息超时。RabbitMQ会重新发送消息,直至通道关闭或成功收到确认消息为止。

需要注意的是,RabbitMQ无法释放任何未确认的消息,这将导致RabbitMQ消耗越来越多的内存。可以通过以下命令调试该错误。

# linux
sudo rabbitmqctl list_queues name message_ready messages_unacknowledged

# window
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

RabbitMQ的消息确认(生产者端消息确认,事务)

public class Send {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msgString = "hello tx message";

        try{
            channel.txSelect();    // 启动事务
            channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());
            channel.txCommit();    // 事务提交
        }catch (Exception e){
            channel.txRollback();    // 回滚
        }
    }
}

类似于mysql中的事务,当出现异常时,会执行回滚,该消息不会发送。这种方式会极大的影响消息队列吞吐量,

RabbitMQ消息确认Confirm(同步)

单条消息confirm

public class send {
    private static final String QUEUE_NAME = "confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 将channel设置为confirm模式
        channel.confirmSelect();
        
        // 单条消息发送
        String msgString = "hello confirm message";
        channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());

        if(!channel.waitForConfirms()){
            System.out.println("message send failed");
        }else{
            System.out.println("message send ok");
        }

        channel.close();
        connection.close();
    }
}

批量消息confirm

public class send {
    private static final String QUEUE_NAME = "confirm";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 将channel设置为confirm模式
        channel.confirmSelect();
        
        // 批量消息发送
        String msgString = "hello confirm message";
        for(int i = 0; i < 20; i++){
            channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());
        }

        if(!channel.waitForConfirms()){
            System.out.println("message send failed");
        }else{
            System.out.println("message send ok");
        }

        channel.close();
        connection.close();
    }
}

RabbitMQ消息确认Confirm(异步)

public class AsynConfirm {

    private static String QUEUE_NAME = "confirm_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnectionUtils.getConnection();
        Channel channel = conn.createChannel();

        // 创建map用于记录发送的消息
        ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();
        // 启用confirm
        channel.confirmSelect();
        // 消息成功发送后的回调函数
        ConfirmCallback successCallBack = (sequenceNumber, multiple) -> {
            if(multiple){
                ConcurrentNavigableMap<Long, String> confirmed = map.headMap(sequenceNumber, true);
                confirmed.clear();
            }else{
                map.remove(sequenceNumber);
            }
        };
        // 消息失败后的回调函数
        ConfirmCallback errorCallBack = (sequenceNumber, multiple) -> {
            String body = map.get(sequenceNumber);
            System.err.format(
                    "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
                    body, sequenceNumber, multiple
            );
        };

        // 添加监听器
        channel.addConfirmListener(successCallBack, errorCallBack);

        // 消息发送
        while(true){
            String msg = "hello confirm";
            // 消息的ID
            long no = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            map.put(no, msg);
        }

    }
}

RabbitMQ消息持久化

上面提到的消息确认机制,可以防止消费者意外死亡时不丢失消息。但如果RabbitMQ服务器意外死亡了,该如何保证消息队列中的消息不丢失。

这时候就需要将消息队列中的消息持久化到本地,当RabbitMQ意外挂掉后再重启,会从本地读取数据到消息队列中(内存)。

开启队列持久化,需要同时修改消费者及生产者队列声明代码

// 开启消息队列持久化
boolean durable = true;
channel.queueDeclare("queue_name", durable, false, false, null);

注意:
该方法并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息到将其写入磁盘,途中仍有一小段时间。另外,RabbitMQ不会对每条消息都执行异步写入磁盘的操作,但是对于简单队列而言,这已经绰绰有余了。

相关文章

网友评论

      本文标题:RabbitMQ消息确认及消息持久化

      本文链接:https://www.haomeiwen.com/subject/icjccktx.html