美文网首页
nsq数据持久化

nsq数据持久化

作者: fake_smile_boy | 来源:发表于2018-12-10 10:07 被阅读0次

    diskqueue结构体

    type diskQueue struct {
        // 64bit atomic vars need to be first for proper alignment on 32bit platforms
    
        // run-time state (also persisted to disk)
        // 数据文件相关信息
        readPos      int64 // 记录数据文件读的位置
        writePos     int64 // 记录数据文件写的位置
        readFileNum  int64 // 当前读文件的编号
        writeFileNum int64 // 当前写文件的编号
        depth        int64 // 队列中消息的数量
    
        sync.RWMutex
    
        // instantiation time metadata
        // 元数据相关
        name            string        // 元数据名称
        dataPath        string        // 元数据的数据目录
        maxBytesPerFile int64         // 每个文件的最大字节数,默认100M // currently this cannot change once created
        minMsgSize      int32         // 一条消息的最小长度
        maxMsgSize      int32         // 一条消息的最大长度
        syncEvery       int64         // 当写入的消息达到syncEvery时则执行sync操作 // number of writes per fsync
        syncTimeout     time.Duration // 每隔syncTimeout时间执行同步一次          // duration of time per fsync
        exitFlag        int32         // 队列退出标识。比如当删除队列时会将该队列标记为1,阻止其他线程操作该队列
        needSync        bool          // 是否需要同步
    
        // keeps track of the position where we have read
        // (but not yet sent over readChan)
        // 读操作是为了投递消息给客户端,如果投递失败则继续使用当前的读取位置再次尝试投递消息
        nextReadPos     int64 // 记录正在投递的消息的位置
        nextReadFileNum int64 // 记录正在投递的消息的文件编号
    
        readFile  *os.File      // 读文件句柄
        writeFile *os.File      // 写文件句柄
        reader    *bufio.Reader // 读文件操作的缓存区
        writeBuf  bytes.Buffer  // 写文件操作的缓存区
    
        // exposed via ReadChan()
        readChan chan []byte // 获取消息的channel
    
        // internal channels
        writeChan         chan []byte // 写入消息的channel
        writeResponseChan chan error  // 返回写入消息的状态
        emptyChan         chan int    // 清空消息的channel
        emptyResponseChan chan error  // 返回清空队列的状态
        exitChan          chan int    // 队列退出的channel
        exitSyncChan      chan int    // 队列退出的同步channel,确保ioLoop先退出
    
        logger Logger
    }
    

    nsq在创建topic和channel时会创建一个diskqueue,它负责像磁盘读写文件。
    当memoryMsgChan内存队列写满了,就会向diskqueue写入消息。
    当memoryMsgChan读取空了,就会从diskqueue读取消息。

    func (t *Topic) put(m *Message) error {
        select {
        case t.memoryMsgChan <- m:
        default:
            b := bufferPoolGet()
            // 将消息写入diskqueue
            err := writeMessageToBackend(b, m, t.backend)
            bufferPoolPut(b)
            t.ctx.nsqd.SetHealth(err)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to write message to backend - %s",
                    t.name, err)
                return err
            }
        }
        return nil
    }
    
    func (t *Topic) messagePump() {
       ...//参见上文代码
        for {
            //从memoryMsgChan及DiskQueue.ReadChan中取消息
            select {
                case msg = <-memoryMsgChan:
                case buf = <- t.backend.ReadChan():
                    msg, _ = decodeMessage(buf)
                case <-t.exitChan:
                   return
            }
         ... //将msg复制N份,发送到topic下的N个Channel中
        }
    }
    

    生成一个diskQueue时会做两件事情

    1. d.retrieveMetaData()加载元数据信息

    2. d.ioLoop()读写磁盘文件

    func New(name string, dataPath string, maxBytesPerFile int64,
        minMsgSize int32, maxMsgSize int32,
        syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
        d := diskQueue{
            name:              name,
            dataPath:          dataPath,
            maxBytesPerFile:   maxBytesPerFile,
            minMsgSize:        minMsgSize,
            maxMsgSize:        maxMsgSize,
            readChan:          make(chan []byte),
            writeChan:         make(chan []byte),
            writeResponseChan: make(chan error),
            emptyChan:         make(chan int),
            emptyResponseChan: make(chan error),
            exitChan:          make(chan int),
            exitSyncChan:      make(chan int),
            syncEvery:         syncEvery,
            syncTimeout:       syncTimeout,
            logf:              logf,
        }
    
        // no need to lock here, nothing else could possibly be touching this instance
        err := d.retrieveMetaData()
        if err != nil && !os.IsNotExist(err) {
            d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
        }
    
        go d.ioLoop()
        return &d
    }
    

    retrieveMetaData:从元数据文件中恢复队列的状态。如果元数据文件不存在返回err,如果存在则加载元数据文件中的内容
    top1.diskqueue.meta.dat 元数据文件内容格式只有三行("%d\n%d,%d\n%d,%d\n")
    2 # 队列中消息的数量
    0,0 # 读文件的编号,偏移位置
    0,76 # 写文件的编号,偏移位置

    func (d *diskQueue) retrieveMetaData() error {
        var f *os.File
        var err error
    
        fileName := d.metaDataFileName()
        f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
        if err != nil {
            return err
        }
        defer f.Close()
    
        var depth int64
        _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
            &depth,
            &d.readFileNum, &d.readPos,
            &d.writeFileNum, &d.writePos)
        if err != nil {
            return err
        }
        atomic.StoreInt64(&d.depth, depth)
        d.nextReadFileNum = d.readFileNum
        d.nextReadPos = d.readPos
    
        return nil
    }
    

    负责读写文件的循环

    func (d *diskQueue) ioLoop() {
        var dataRead []byte
        var err error
        var count int64 // 计数器变量
        var r chan []byte
    
        syncTicker := time.NewTicker(d.syncTimeout)
    
        for {
            // dont sync all the time :)
            // count计数器打到d.syncEvery的数量时,设置d.needSync为true则执行同步操作
            if count == d.syncEvery {
                d.needSync = true
            }
    
            // needSync变量控制是否需要同步,同步完成后该变量置为false
            if d.needSync {
                // 定时调用sync函数将内存中的消息刷新到磁盘
                err = d.sync()
                if err != nil {
                    d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
                }
                count = 0
            }
    
            // 检测当前是否有数据需要被读取
            // 条件成立:执行d.readOne()并将结果放入dataRead中,然后设置r为d.readChan
            // 条件不成立:将r设置为nil
            if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
                if d.nextReadPos == d.readPos {
                    dataRead, err = d.readOne()
                    if err != nil {
                        d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                            d.name, d.readPos, d.fileName(d.readFileNum), err)
                        d.handleReadError()
                        continue
                    }
                }
                r = d.readChan
            } else {
                r = nil
            }
    
            select {
            // the Go channel spec dictates that nil channel operations (read or write)
            // in a select are skipped, we set r to d.readChan only when there is data to read
            // 在注释中作者写了这是一个Golang的特性
            // 如果r不为空,则会将dataRead送入go channel。进入d.readChan的消息通过ReadChan函数向外暴露,最终被Topic/Channel的消息循环读取。
            // 而如果r为空,则这个分支会被跳过。这个特性的使用统一了select的逻辑,简化了当数据为空时的判断。
            case r <- dataRead:
                // 消息投递
                count++
                // moveForward sets needSync flag if a file is removed
                // 消息投递成功后的操作
                d.moveForward()
            case <-d.emptyChan:
                // 执行清空操作时,文件全被删除,count计数器重置为0
                d.emptyResponseChan <- d.deleteAllFiles()
                count = 0
            case dataWrite := <-d.writeChan:
                // 消息写入则count计数器自增
                count++
                d.writeResponseChan <- d.writeOne(dataWrite)
            case <-syncTicker.C:
                // 每隔syncTimeout时间同步一次
                if count == 0 {
                    // avoid sync when there's no activity
                    continue
                }
                d.needSync = true
            case <-d.exitChan:
                // 退出ioLook
                goto exit
            }
        }
    
    exit:
        d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name)
        syncTicker.Stop()
        d.exitSyncChan <- 1
    }
    

    总结一下diskQueue
    1、外部goroutine会在memoryMsgChan写满了或读空了的时候,对diskQueue的writeChan或者readChan写入消息。ioLoop则监听writeChan和readChan进行磁盘的写读操作。
    2、在对磁盘文件的读写时,需要记录文件编号和偏移位置。启动diskQueue时会从元数据文件恢复队列数据,关闭时会将最新的读取位置记录到元数据文件。
    3、syncTimeout和syncEvery可以设置内存数据同步到磁盘的频率,syncTimeout是指每隔syncTimeout秒调用一次d.sync(),syncEvery是指每当写入syncEvery个消息后调用一次d.sync()。

    相关文章

      网友评论

          本文标题:nsq数据持久化

          本文链接:https://www.haomeiwen.com/subject/wxzcxqtx.html