6 保证消息不丢失

作者: 笑Skr人啊 | 来源:发表于2019-12-26 18:04 被阅读0次

    1 消息持久化

    1.1 exchange持久化

    /*
     * 声明消息队列,且为可持久化的
     *
     * EXCHANGE_NAME: 交换机名称(name)
     * direct: 交换机类型(type)
     * true: 是否持久化(durable)
     * false: 是否自动删除(autoDelete)
     * null: 其他参数(arguments)
     */
     channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
    

    1.2 queue持久化

    /*
     * 声明消息队列,且为可持久化的
     *
     * QUEUE_NAME: 队列名称(name)
     * true: 是否持久化(durable)
     * false: 是否为排他性队列(exclusive)
        1: 只对首次声明它的连接(Connection)可见
            1.1 首次声明: 因为另外一个连接无法声明一个同样的排他性队列
            1.2 只区别连接(Connection)而不是通道(Channel),从同一个连接创建的不同的通道可以同时访问某一个排他性的队列
    
        2: 会在其连接断开的时候自动删除
            2.1 无论队列是否被声明成持久性的(Durable =true),只要调用连接的Close方法或者客户端程序退出,RabbitMQ都会删除这个队列
            2.2 注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。
    
     * false: 是否自动删除(autoDelete)
     * null: 其他参数(arguments)
     */
    channel.queueDeclare(QUEUE_NAME, true, false,  false,null);
    

    1.3 消息持久化

    // 消息持久化
    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    // 1:不持久化 2:持久化
    builder.deliveryMode(2);
    AMQP.BasicProperties properties = builder.build();
    /*
     * EXCHANGE_NAME: 交换机名称
     * "" : 路由key
     * properties: 基本属性
     * message: 消息体
     */
    channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes());
    

    2 消费者ACK确认机制

    2.1 自动确认

    // 自动确认
    channel.basicConsume(QUEUE_NAME, true, consumer);
    
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    

    2.2 手动确认

    // 手动确认
    channel.basicConsume(QUEUE_NAME, false, consumer);
    
    // 向服务端确认消息已被消费
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    
    

    3 生产者AMQP事务机制(生产环境禁止使用)

    
    // 开启事务
    try {
        String message = "I am simple_queue!";
        // 开启事务
        channel.txSelect();
        // 往队列中发出一条消息,使用rabbitmq默认交换机
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        // 提交事务
        channel.txCommit();
    } catch (Exception e) {
        e.printStackTrace();
        // 事务回滚
        channel.txRollback();
    }
    

    4 生产者确认

    try {
        // 生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式
        channel.confirmSelect();
        // 消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID)
        channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
    
        // 
        if (!channel.waitForConfirms()){
            System.out.println("message failed!");
            // do something
        }
    }catch (Exception e){
        e.printStackTrace();
    }
    

    相关文章

      网友评论

        本文标题:6 保证消息不丢失

        本文链接:https://www.haomeiwen.com/subject/fkeloctx.html