启动流程
- 利用 svc 创建一个常驻进程
- 读取配置,优先级:命令行、配置文件、默认配置
- 实例化 nsqd,初始化日志、http client,数据文件锁等
- 读取元数据,就是数组的 topics 并且包含里面的 channels,每个都有是否暂停的标志位
- 协程启动 tcp server, http[s] server , queueScanLoop, lookupLoop,statsdLoop
- 等待信号执行关闭 tcp server 和 http & https server,持久化元数据(topic & channel & version,存储格式 json)
TCP Server
- 功能:
- 消费者:客户端初始化,开始 messagePump 到订阅的 channel
- 生产者:客户端初始化,推消息,创建 topic,topic 内部 messagePump 发往 channel
-
生产流程:读取协议标识 V2 (4个字节),等待 identify、pub|mpub|dpub 写消息
-
消费流程:读取协议标识 V2 (4个字节),等待 identify、sub、rdy 命令(标识、订阅,准备)。监听内存队列和磁盘队列把数据传输给客户端
-
命令:
- IDENTIFY: 客户端连接上后,标识自己,同时得到服务端的信息
- FIN: 消息顺利处理完了,里面会 in-flight 的消息清理掉,rdy 加1, 还有其他统计数据
- RDY: 流控 客户端告知我有能力处理多少个消息,服务器端就浪多少呗
- REQ: 消息重新进队列
- PUB, MPUB, DPUB: 写消息, 优先写内存队列,写不进写 backend 队列,使用 select 模式,新的 topic 会 messagePump,很简单从内存和磁盘读,循环 channel 发送 msg,每个客户端的 messagePump 获取
- NOP:空指令什么也不干
- TOUCH: 重置消息超时时间, pop queue 放到最后去,受到 max msg timeout 限制
- SUB:订阅命令
- CLS: 关闭
# 消息格式,用于磁盘存储
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
| (int64) || || (hex string encoded in ASCII) || (binary)
| 8-byte || || 16-byte || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp ^^ message ID message body
(uint16)
2-byte
attempts
HTTP Server
- /ping 健康检查
- /info 打印一些配置信息,主机名,版本号,启动时间
- /pub & /mpub 生产消息,写内存,如果超配置数量写磁盘,没有消费者连接写磁盘
- /stats 统计信息,很详细
- /topic/create 创建 topic,内部会从 lookupd 同步 channel
- /topic/delete 删除 topic,未消费的消息存入磁盘,循环 channels,未消费的消息存入磁盘
- /topic/empty 清空 topic,内存和磁盘都清空
- /topic/pause & /topic/unpaus 中断或开启 topic,内部其实就是把两个管道架空,重新存储下元数据
- /channel/* channel 的创建,删除,清空,中断,开启。和 topic 类似就是换个结构体
- /config/:opt 取得配置信息,动态修改一些配置,nslookup、日志级别等
- /debug/pprof pprof debug 信息
queueScanLoop
- 功能:将把在 InFlightQueue 和 DeferredQueue 过期的消息重新放入队列
- 算法:用的是 redis 概率过期算法,维护一个队列扫描池,抽取部分 channel(默认25%) 进行扫描
- 有超 25% 的 channel 是超时脏的,就没有休息的接着干呗
- 注意:Message 对象的 pri 字段存的是过期时间
- 注意:Pqueue ( priority 优先级队列)堆排序,分别用作延迟消息和消息的at least once机制。
lookupLoop
- 功能:连接所有的 nsqlookupd 并同步 topic channel 信息的
- 细节:创建连接,发送 V1,发送 Identify 指令,同步当前的 topic 和 channel,定期 ping 维护状态
- 可动态配置 nsqlookupd,当 topic & channel 有变动时及时通知变更
statsdLoop
- 功能:实时通过 udp 发送统计信息
网友评论