美文网首页
nsq数据持久化

nsq数据持久化

作者: fake_smile_boy | 来源:发表于2018-12-10 10:07 被阅读0次

diskqueue结构体

type diskQueue struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms

    // run-time state (also persisted to disk)
    // 数据文件相关信息
    readPos      int64 // 记录数据文件读的位置
    writePos     int64 // 记录数据文件写的位置
    readFileNum  int64 // 当前读文件的编号
    writeFileNum int64 // 当前写文件的编号
    depth        int64 // 队列中消息的数量

    sync.RWMutex

    // instantiation time metadata
    // 元数据相关
    name            string        // 元数据名称
    dataPath        string        // 元数据的数据目录
    maxBytesPerFile int64         // 每个文件的最大字节数,默认100M // currently this cannot change once created
    minMsgSize      int32         // 一条消息的最小长度
    maxMsgSize      int32         // 一条消息的最大长度
    syncEvery       int64         // 当写入的消息达到syncEvery时则执行sync操作 // number of writes per fsync
    syncTimeout     time.Duration // 每隔syncTimeout时间执行同步一次          // duration of time per fsync
    exitFlag        int32         // 队列退出标识。比如当删除队列时会将该队列标记为1,阻止其他线程操作该队列
    needSync        bool          // 是否需要同步

    // keeps track of the position where we have read
    // (but not yet sent over readChan)
    // 读操作是为了投递消息给客户端,如果投递失败则继续使用当前的读取位置再次尝试投递消息
    nextReadPos     int64 // 记录正在投递的消息的位置
    nextReadFileNum int64 // 记录正在投递的消息的文件编号

    readFile  *os.File      // 读文件句柄
    writeFile *os.File      // 写文件句柄
    reader    *bufio.Reader // 读文件操作的缓存区
    writeBuf  bytes.Buffer  // 写文件操作的缓存区

    // exposed via ReadChan()
    readChan chan []byte // 获取消息的channel

    // internal channels
    writeChan         chan []byte // 写入消息的channel
    writeResponseChan chan error  // 返回写入消息的状态
    emptyChan         chan int    // 清空消息的channel
    emptyResponseChan chan error  // 返回清空队列的状态
    exitChan          chan int    // 队列退出的channel
    exitSyncChan      chan int    // 队列退出的同步channel,确保ioLoop先退出

    logger Logger
}

nsq在创建topic和channel时会创建一个diskqueue,它负责像磁盘读写文件。
当memoryMsgChan内存队列写满了,就会向diskqueue写入消息。
当memoryMsgChan读取空了,就会从diskqueue读取消息。

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        // 将消息写入diskqueue
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

func (t *Topic) messagePump() {
   ...//参见上文代码
    for {
        //从memoryMsgChan及DiskQueue.ReadChan中取消息
        select {
            case msg = <-memoryMsgChan:
            case buf = <- t.backend.ReadChan():
                msg, _ = decodeMessage(buf)
            case <-t.exitChan:
               return
        }
     ... //将msg复制N份,发送到topic下的N个Channel中
    }
}

生成一个diskQueue时会做两件事情

  1. d.retrieveMetaData()加载元数据信息

  2. d.ioLoop()读写磁盘文件

func New(name string, dataPath string, maxBytesPerFile int64,
    minMsgSize int32, maxMsgSize int32,
    syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
    d := diskQueue{
        name:              name,
        dataPath:          dataPath,
        maxBytesPerFile:   maxBytesPerFile,
        minMsgSize:        minMsgSize,
        maxMsgSize:        maxMsgSize,
        readChan:          make(chan []byte),
        writeChan:         make(chan []byte),
        writeResponseChan: make(chan error),
        emptyChan:         make(chan int),
        emptyResponseChan: make(chan error),
        exitChan:          make(chan int),
        exitSyncChan:      make(chan int),
        syncEvery:         syncEvery,
        syncTimeout:       syncTimeout,
        logf:              logf,
    }

    // no need to lock here, nothing else could possibly be touching this instance
    err := d.retrieveMetaData()
    if err != nil && !os.IsNotExist(err) {
        d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
    }

    go d.ioLoop()
    return &d
}

retrieveMetaData:从元数据文件中恢复队列的状态。如果元数据文件不存在返回err,如果存在则加载元数据文件中的内容
top1.diskqueue.meta.dat 元数据文件内容格式只有三行("%d\n%d,%d\n%d,%d\n")
2 # 队列中消息的数量
0,0 # 读文件的编号,偏移位置
0,76 # 写文件的编号,偏移位置

func (d *diskQueue) retrieveMetaData() error {
    var f *os.File
    var err error

    fileName := d.metaDataFileName()
    f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
    if err != nil {
        return err
    }
    defer f.Close()

    var depth int64
    _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
        &depth,
        &d.readFileNum, &d.readPos,
        &d.writeFileNum, &d.writePos)
    if err != nil {
        return err
    }
    atomic.StoreInt64(&d.depth, depth)
    d.nextReadFileNum = d.readFileNum
    d.nextReadPos = d.readPos

    return nil
}

负责读写文件的循环

func (d *diskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64 // 计数器变量
    var r chan []byte

    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        // dont sync all the time :)
        // count计数器打到d.syncEvery的数量时,设置d.needSync为true则执行同步操作
        if count == d.syncEvery {
            d.needSync = true
        }

        // needSync变量控制是否需要同步,同步完成后该变量置为false
        if d.needSync {
            // 定时调用sync函数将内存中的消息刷新到磁盘
            err = d.sync()
            if err != nil {
                d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
            }
            count = 0
        }

        // 检测当前是否有数据需要被读取
        // 条件成立:执行d.readOne()并将结果放入dataRead中,然后设置r为d.readChan
        // 条件不成立:将r设置为nil
        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err)
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        // 在注释中作者写了这是一个Golang的特性
        // 如果r不为空,则会将dataRead送入go channel。进入d.readChan的消息通过ReadChan函数向外暴露,最终被Topic/Channel的消息循环读取。
        // 而如果r为空,则这个分支会被跳过。这个特性的使用统一了select的逻辑,简化了当数据为空时的判断。
        case r <- dataRead:
            // 消息投递
            count++
            // moveForward sets needSync flag if a file is removed
            // 消息投递成功后的操作
            d.moveForward()
        case <-d.emptyChan:
            // 执行清空操作时,文件全被删除,count计数器重置为0
            d.emptyResponseChan <- d.deleteAllFiles()
            count = 0
        case dataWrite := <-d.writeChan:
            // 消息写入则count计数器自增
            count++
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            // 每隔syncTimeout时间同步一次
            if count == 0 {
                // avoid sync when there's no activity
                continue
            }
            d.needSync = true
        case <-d.exitChan:
            // 退出ioLook
            goto exit
        }
    }

exit:
    d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}

总结一下diskQueue
1、外部goroutine会在memoryMsgChan写满了或读空了的时候,对diskQueue的writeChan或者readChan写入消息。ioLoop则监听writeChan和readChan进行磁盘的写读操作。
2、在对磁盘文件的读写时,需要记录文件编号和偏移位置。启动diskQueue时会从元数据文件恢复队列数据,关闭时会将最新的读取位置记录到元数据文件。
3、syncTimeout和syncEvery可以设置内存数据同步到磁盘的频率,syncTimeout是指每隔syncTimeout秒调用一次d.sync(),syncEvery是指每当写入syncEvery个消息后调用一次d.sync()。

相关文章

  • nsq数据持久化

    diskqueue结构体 nsq在创建topic和channel时会创建一个diskqueue,它负责像磁盘读写文...

  • Docker学习(13) 卷与持久化数据

    Docker学习(13) 卷与持久化数据 卷与持久化数据——简介 数据主要分为两种:持久化和非持久化。 持久化:就...

  • iOS本地数据持久化

    iOS本地数据持久化 iOS本地数据持久化

  • Redis-2 数据持久化及持久化配置

    一、数据持久化 开启持久化功能后,重启redis,数据会自动通过持久化文件恢复!! 1、redis持久化 – 两种...

  • 面试相关

    数据持久化 什么是持久化狭义的理解: “持久化”仅仅指把域对象永久保存到数据库中;广义的理解,“持久化”包括和数据...

  • GeekBand iOS开发高级进阶学习笔记(第四周)

    简易数据存储 数据持久化分为本体持久化和云端持久化本体持久化可以存在本地文件或数据库。云端可以存在iCloud,存...

  • iOS数据持久化

    Title: iOS数据持久化 ##数据持久化概念 数据持久化就是将内存中的数据模型转换为存储模型,以及将存储模型...

  • redis与memcache区别

    1、持久化 redis是支持持久化存储,宕机重启数据不会丢失,memcache重启后数据丢失 redis持久化的方...

  • iOS本地数据持久化

    转载自:CocoaChina - iOS本地数据持久化 本文内容:iOS本地数据持久化的几种类型iOS本地数据持久...

  • Android数据持久化的设计

    title: Android数据持久化 版 本 历 史 1. 数据持久化简介 1.1 数据持久化的需求 在网络异常...

网友评论

      本文标题:nsq数据持久化

      本文链接:https://www.haomeiwen.com/subject/wxzcxqtx.html