美文网首页
RabbitMQ入门5-消息确认模式和幂等性

RabbitMQ入门5-消息确认模式和幂等性

作者: hi李昊天 | 来源:发表于2020-06-20 20:34 被阅读0次

    1.消息确认模式

    在RabbitMQ中,消息确认主要有生产者发生确认和消费者接收确认

    1.1生产者发送确认

    生产者发送消息到RabbitMQ服务器,如果RabbitMQ服务器收到消息,则会给生产者一个应答,用于告诉生产者该消息已经成功到达RabbitMQ服务器中

    1.2消费者接收确认

    用于确认消费者是否成功消费了该条消息
    消息确认实现方式有两种

    1. 通过事务的方式
    2. confirm确认机制,因为事务模式比较消耗性能,在实际工作中用的也不多

    2.生产者发送确认

    2.1 开启confirm模式

    当Channel.Confirm(noWait bool)参数设置为false时,broker会返回一个confirm.ok表示同意发送者将当前channel信道设置为confirm模式。
    其他代码和transaction模式类似,只是没有Channel.TxCommit()和Channel.TxRollback()。

    err = channel.Confirm(false)
    

    2.2 以confirm模式发送消息

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "rmq/db/rmq"
    )
    
    var (
        channel *amqp.Channel
        err     error
        queue   amqp.Queue
        conn    *amqp.Connection
    )
    
    func main() {
        conn, err = rmq.GetConn()
    
        defer conn.Close()
    
        channel, err = conn.Channel()
    
        if err != nil {
            fmt.Printf("error: %s \n", err.Error())
            return
        }
    
        defer channel.Close()
    
        err = channel.Confirm(false)
    
        if err != nil {
            fmt.Printf("error: %s \n", err.Error())
            return
        }
    
        queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil)
        if err != nil {
            fmt.Printf("error: %s \n", err.Error())
            return
        }
    
        confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
    
        defer confirmOne(confirms)
        err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("confirm message"),
        })
    
        if err != nil {
            fmt.Printf("error: %s \n", err.Error())
            return
        }
    
        fmt.Println("消息发送成功")
    
    }
    
    func confirmOne(confirms <-chan amqp.Confirmation) {
        if confirmed := <-confirms; confirmed.Ack {
            fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
        } else {
            fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
        }
    }
    
    

    消息拒绝

    _ = d.Nack(false, false) // 手动拒绝消息 可以拒绝多条消息 第二个参数设置为true 将再次放入队列中
    _ = d.Reject(true) // 手动拒绝消息 只能拒绝一条消息 为true 将再次放入队列中
    _ = d.Ack(false) // 手动确认
    

    1.简介

    消息幂等性其实就是保证同一个消息不被消费者重复消费两次
    当消费者消费完之后,通常会发送一个ack应答确认消息给生产者
    但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,有此这条消息将被重复发送给消费者消费,实际上这条消息已经被消费过了,这就是重复消费的问题!!!

    1.1 如何避免重复消费

    • 消息全局ID或者写个唯一标识 (时间戳,uuid等),每次消费消息之前根据消息id去判断该消息是否已被消费过,如果已经消费国,则不处理该消息,否则正常消费,并且进行入库操作(消息全局ID作为数据库表的主键,防止重复)
    • 利用redis的setnx命令,给消息分配一个全局ID,只要消费过该消息,将id message k:v 形式写入redis 消费者开始消费前 先去redis查询有没有消费记录

    1.2 代码演示

    生产者
    channel.Publish("", queue.Name, false, false,
                amqp.Publishing{
                    MessageId:   uuid.NewV4().String(),
                    Timestamp: time.Now(),
                    ContentType: "text/plain",
                    Body:        []byte(fmt.Sprintf("hello---%d", i)),
                })
    
    消费者
    go func() {
            for d := range megs {
                err = db.GetRedis().Get(d.MessageId).Err()
                if err != redis.Nil {
                    // 消息已被消费 忽略
                    logger.Warn("消息已被消费 忽略 %s", d.MessageId)
                    _ = d.Reject(false)
                    continue
                }
    
                logger.Info("messageBody: %s", d.Body)
                logger.Info("messageID: %s", d.MessageId)
                logger.Info("messageID: %s", d.Timestamp.Format("2006-01-02 15:04:05"))
    
                if err := d.Ack(false); err != nil {
                    logger.Error("消息确认失败")
                } else {
                    db.GetRedis().SetNX(d.MessageId, d.Body, time.Hour*2)
                    logger.Warn("设置消息id")
                }
            }
        }()
    

    相关文章

      网友评论

          本文标题:RabbitMQ入门5-消息确认模式和幂等性

          本文链接:https://www.haomeiwen.com/subject/rcigxktx.html