美文网首页
NSQD 源码分析

NSQD 源码分析

作者: 威廉姆韦德惠达王 | 来源:发表于2019-07-28 22:38 被阅读0次

    启动流程

    1. 利用 svc 创建一个常驻进程
    2. 读取配置,优先级:命令行、配置文件、默认配置
    3. 实例化 nsqd,初始化日志、http client,数据文件锁等
    4. 读取元数据,就是数组的 topics 并且包含里面的 channels,每个都有是否暂停的标志位
    5. 协程启动 tcp server, http[s] server , queueScanLoop, lookupLoop,statsdLoop
    6. 等待信号执行关闭 tcp server 和 http & https server,持久化元数据(topic & channel & version,存储格式 json)

    TCP Server

    • 功能:
    1. 消费者:客户端初始化,开始 messagePump 到订阅的 channel
    2. 生产者:客户端初始化,推消息,创建 topic,topic 内部 messagePump 发往 channel
    • 生产流程:读取协议标识 V2 (4个字节),等待 identify、pub|mpub|dpub 写消息

    • 消费流程:读取协议标识 V2 (4个字节),等待 identify、sub、rdy 命令(标识、订阅,准备)。监听内存队列和磁盘队列把数据传输给客户端

    • 命令:

    1. IDENTIFY: 客户端连接上后,标识自己,同时得到服务端的信息
    2. FIN: 消息顺利处理完了,里面会 in-flight 的消息清理掉,rdy 加1, 还有其他统计数据
    3. RDY: 流控 客户端告知我有能力处理多少个消息,服务器端就浪多少呗
    4. REQ: 消息重新进队列
    5. PUB, MPUB, DPUB: 写消息, 优先写内存队列,写不进写 backend 队列,使用 select 模式,新的 topic 会 messagePump,很简单从内存和磁盘读,循环 channel 发送 msg,每个客户端的 messagePump 获取
    6. NOP:空指令什么也不干
    7. TOUCH: 重置消息超时时间, pop queue 放到最后去,受到 max msg timeout 限制
    8. SUB:订阅命令
    9. CLS: 关闭
    # 消息格式,用于磁盘存储
    [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
    |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
    |       8-byte         ||    ||                 16-byte                      || N-byte
    ------------------------------------------------------------------------------------------...
    nanosecond timestamp      ^^                   message ID                       message body                        
                           (uint16)
                            2-byte
                           attempts
    

    HTTP Server

    1. /ping 健康检查
    2. /info 打印一些配置信息,主机名,版本号,启动时间
    3. /pub & /mpub 生产消息,写内存,如果超配置数量写磁盘,没有消费者连接写磁盘
    4. /stats 统计信息,很详细
    5. /topic/create 创建 topic,内部会从 lookupd 同步 channel
    6. /topic/delete 删除 topic,未消费的消息存入磁盘,循环 channels,未消费的消息存入磁盘
    7. /topic/empty 清空 topic,内存和磁盘都清空
    8. /topic/pause & /topic/unpaus 中断或开启 topic,内部其实就是把两个管道架空,重新存储下元数据
    9. /channel/* channel 的创建,删除,清空,中断,开启。和 topic 类似就是换个结构体
    10. /config/:opt 取得配置信息,动态修改一些配置,nslookup、日志级别等
    11. /debug/pprof pprof debug 信息

    queueScanLoop

    • 功能:将把在 InFlightQueue 和 DeferredQueue 过期的消息重新放入队列
    • 算法:用的是 redis 概率过期算法,维护一个队列扫描池,抽取部分 channel(默认25%) 进行扫描
    • 有超 25% 的 channel 是超时脏的,就没有休息的接着干呗
    • 注意:Message 对象的 pri 字段存的是过期时间
    • 注意:Pqueue ( priority 优先级队列)堆排序,分别用作延迟消息和消息的at least once机制。

    lookupLoop

    • 功能:连接所有的 nsqlookupd 并同步 topic channel 信息的
    • 细节:创建连接,发送 V1,发送 Identify 指令,同步当前的 topic 和 channel,定期 ping 维护状态
    • 可动态配置 nsqlookupd,当 topic & channel 有变动时及时通知变更

    statsdLoop

    • 功能:实时通过 udp 发送统计信息

    相关文章

      网友评论

          本文标题:NSQD 源码分析

          本文链接:https://www.haomeiwen.com/subject/cermrctx.html