美文网首页码农的世界网络编程魔法Golang
go-libp2p 之 NewStream 深层阅读笔记

go-libp2p 之 NewStream 深层阅读笔记

作者: cc14514 | 来源:发表于2018-12-04 20:14 被阅读124次

Stream 的 Header 协议

格式

  • Version (8 bits)
  • Type (8 bits)
  • Flags (16 bits)
  • StreamID (32 bits)
  • Length (32 bits)

当前版本中 vsn 始终等于 0

Type

  • 0x0 Data : 用于数据传输,其中 length 代表 payload 的长度
  • 0x1 WindowUpdate : 用来改变指定 Stream 的 recvWindow 尺寸,length 标示窗口的增量更新值
  • 0x2 Ping : 用作心跳保持长连接或者采集 RTT度量值,在响应中要回复 StreamID 和 Length 属性
  • 0x3 GoAway : 用来终止会话,此时 StreamID = 0 而 Length 标示ErrorCode

Flags

  • 0x1 SYN : 创建Stream
  • 0x2 ACK : 用于确认SYN消息
  • 0x4 FIN : 执行 Stream 的半关闭
  • 0x8 RST : 立即重置给定的 Stream

从命名为 flags 和 value 的取值可以看出这里的 flag 是可以叠加的
换成二进制 1 = 0001 , 2 = 0010 , 4 = 0100 , 8 = 1000
叠加时可以用 OR 操作
例如 flags = SYN | ACK ,此时 flags = 3,3 没有出现在协议列表里,但是 3 == 1 | 2

NewStream & OpenStream

Host.NewStream
  1. 在应用程序中通过 Host 接口的实现类 BasicHost.NewStream 方法在创建一个 stream 通道;
  2. 通过 Network 接口的实现类 Swarm.NewStream 进一步创建 stream;
  3. 跟 p 没有长链接时去创建连接,成功后会返回一个 swarm 中定义的 Conn 对象;
  4. 经过一系列调用请求落到 Transport 接口的 Dial 上,我们通常用 tcp 的实现来写应用;
  5. 这一步只是为了说明这个连接是在 Transport 中调用的标准库 net.Dialer 来创建的;
  6. 返回的 Swarm.Conn 包含了 streammux.Conn 接口的实现,OpenStream 定义在这个接口中;
  7. 前面封装的挺复杂,但是从这才算真正的开始,这里会在当前的 Conn 上返回我们想要的 Stream;
  8. OpenStream 生成了 id = streamID 并且创建了 yamux.Stream 对象,还发送了 windowUpdate 协议;
  9. newStream 中制定了几个很关键的参数,包括 headerSize = 12byte,初始的流窗口尺寸 initialStreamWindow = 256k ;
  10. sendWindowUpdate 负责向被连接端发送一个 header=[]byte{0, windowUpdate, SYN, id, delta} 消息,SYN 标示创建一个新的 Stream 并且设置初始数据窗口。

时序图中概括了创建 Stream 的过程,只列出了关键路径,实际代码比这个复杂很多,不太关心此部分的抽象设计故此忽略了对于接口的描述,直接关注实现,而且貌似只有最后一步最重要

关于 streamID 的计算

  • yamux/session.go

代码很短,可以看出 id 就是自增的,但为什么每次都是 id = id+2 呢?这就必须要知道 nextStreamID 的初始值了,在 newSession 时指定了 if client { id = 1 } else { id = 2 },这就可以看出每个奇数 id 都代表了 client ,而偶数 id 则代表了 server 端。

func (s *Session) OpenStream() (*Stream, error) {
......
GET_ID:
    // Get an ID, and check for stream exhaustion
    id := atomic.LoadUint32(&s.nextStreamID)
    if id >= math.MaxUint32-1 {
        return nil, ErrStreamsExhausted
    }
    if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
        goto GET_ID
    }
......
}

为了缩短篇幅,前面省略了 newSession 的逻辑,因为的确没太多看的价值,简单来说是 Transport.NewConn(nc net.Conn, isServer bool) 时指定了一个 isServer 参数,这个参数在后面 newSession 时会用来决定 client == true / false 进而决定 id 的初始值。我们可以认为 id 的初始值一直是 1,因为只有本地的 peerID 是空时,isServer 才是 true,p2p 环境下就当这种情况不存在吧。

关于 delta 的计算

  • yamux/stream.go

我们看到在 Type = WindowUpdate 时对 delta 有个描述,它是用来增量更新窗口尺寸的,这个代码有点长,顺着注视来读吧

// sendWindowUpdate potentially sends a window update enabling
// further writes to take place. Must be invoked with the lock.
func (s *Stream) sendWindowUpdate() error {
    s.controlHdrLock.Lock()
    defer s.controlHdrLock.Unlock()

    // Determine the delta update
    // 默认 MaxStreamWindowSize == 256k ,可以在创建 yamux 时重新指定大小
    max := s.session.config.MaxStreamWindowSize
    s.recvLock.Lock()
    // recvBuf 是在 readData 时通过 Grow 函数来重新分配的,此时还没有分配 ,recvWindow 的在 newStream 时给了默认值 256k
    // 所以这里应该是 delta = (256k - 0) - 256k = 0 
    delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow

    // Determine the flags if any
    flags := s.sendFlags()

    // Check if we can omit the update
    if delta < (max/2) && flags == 0 {
        s.recvLock.Unlock()
        return nil
    }

    // Update our window
    // 增量更新 recvWindow 窗口,在 readData 后会减小窗口,是不是很熟悉?滑动窗口
    s.recvWindow += delta
    s.recvLock.Unlock()

    // Send the header
    s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
    if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr, nil); err != nil {
        return err
    }
    return nil
}

至此我们创建一个 Stream 并且通知对端打开一个窗口准备接收数据,本地也设定了初始的窗口大小,接下来看看对端是如何处理我们发过去的 header 的。

recvLoop

libp2p Listen

时序图没有画出 accept 的逻辑, handlerIncoming 的调用是异步的,并不会阻塞在此处。如果对 accepthandlerIncoming 之前的调用逻辑感兴趣,可参考 《go-libp2p-host Connect 源码分析》 章节,此处不再深入,仅概括了关键方法的调用过程,知道 handlerIncoming 之后,会沿着这个路径调用到 recvLoop() 即可

  • yamux/session.go
// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
// 每种 type 都有配置指定的 handler
var (
    handlers = []func(*Session, header) error{
        typeData:         (*Session).handleStreamMessage,
        typeWindowUpdate: (*Session).handleStreamMessage,
        typePing:         (*Session).handlePing,
        typeGoAway:       (*Session).handleGoAway,
    }
)

// recvLoop continues to receive data until a fatal error is encountered
func (s *Session) recvLoop() error {
    defer close(s.recvDoneCh)
    // 这里创建了 12字节的 header buffer
    // 这就说明在 stream 上收到每个请求开头的 12 个字节都是一个 header
    hdr := header(make([]byte, headerSize))
    for {
        // Read the header
        if _, err := io.ReadFull(s.reader, hdr); err != nil {
            if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
                s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
            }
            return err
        }
        // 上面去读 header 并且进行了检查,是不是符合规则,我们再回忆一下 header 的格式
        // |<- vsn(1) ->|<- type(1) ->|<- flags(2) ->|<- streamID(4) ->|<- length(4) ->|
        // Verify the version
        if hdr.Version() != protoVersion {
            s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
            return ErrInvalidVersion
        }
        // 校验 type 的范围
        mt := hdr.MsgType()
        if mt < typeData || mt > typeGoAway {
            return ErrInvalidMsgType
        }
        //我们要观察的是 typeWindowUpdate ,这个在 handlers 中的序号是 1,所以这个类型的处理函数是 (*Session).handleStreamMessage
        if err := handlers[mt](s, hdr); err != nil {
            return err
        }
    }
}

只要 BasicHost.NewHost 成功之后,本地就会启动 recvLoop,所有从外面来的 stream packet 都会通过 recvLoop 调度

每个 packect 都会按照 header 中的 type 属性分配给不同的 handler 进行处理。

handleStreamMessage

......
        typeData:         (*Session).handleStreamMessage,
        typeWindowUpdate: (*Session).handleStreamMessage,
......

我们最关心的两个 type 都是由 handleStreamMessage 来处理的,所以直接去读这个方法的代码即可。阅读前先来了解一下 Stream 的状态迁移,因为是两端进行 TCP 连接,我们就用 FROMTO 来代表这两端,前面的部分讲的都是 FROM 端的逻辑,已经说到 FROMTO 发送了 SYN 报文, 此时 TO 再回复一个 ACK 报文即可成功开启一个 Streamstream 这几了一系列状态来控制交互,具体如下表:

State Value
streamInit 0
streamSYNSent 1
streamSYNReceived 2
streamEstablished 3
streamLocalClose 4
streamRemoteClose 5
streamClosed 6
streamReset 7

创建一个 Stream 时,FROMTO 会按照下图来进行状态迁移

state

上图并不是完整的状态迁移图,只画到成功创建连接并没有描述异常时的各种 Reset / Close 过程,但是已经足够协助我们搞清楚 handleStreamMessage 的逻辑了

// ================
// yamux/session.go
// ================
// handleStreamMessage handles either a data or window update frame
func (s *Session) handleStreamMessage(hdr header) error {
    // Check for a new stream creation
    id := hdr.StreamID()
    flags := hdr.Flags()
    // TO 端第一次收到 FROM 端消息时,这个条件会成立
    if flags&flagSYN == flagSYN {
        // 下文单独介绍这个方法的作用
        if err := s.incomingStream(id); err != nil {
            return err
        }
    }

    // Get the stream
    s.streamLock.Lock()
    stream := s.streams[id]
    s.streamLock.Unlock()

    // If we do not have a stream, likely we sent a RST
    if stream == nil {
        // Drain any data on the wire
        if hdr.MsgType() == typeData && hdr.Length() > 0 {
            s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
            if _, err := io.CopyN(ioutil.Discard, s.reader, int64(hdr.Length())); err != nil {
                s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
                return nil
            }
        } else {
            s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
        }
        return nil
    }

    // Check if this is a window update
    if hdr.MsgType() == typeWindowUpdate {
            // FROM 端的 state 是在这个方法中变成 streamEstablished 的
        if err := stream.incrSendWindow(hdr, flags); err != nil {
            if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
                s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
            }
            return err
        }
        return nil
    }

    // Read the new data
    if err := stream.readData(hdr, flags, s.reader); err != nil {
        if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
            s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
        }
        return err
    }
    return nil
}

incomingStream这个方法只有 FROMTO 发送 SYN 消息时才会被 TO 端调用,他要做的是在本地创建一个 stream 对象并设置状态为 streamSYNReceived,然后通过 acceptCh 通道去调用 sendWindowUpdate 方法来回复 ACK 消息给 FROM

// ================
// yamux/session.go
// ================
// incomingStream is used to create a new incoming stream
func (s *Session) incomingStream(id uint32) error {
    ......
    // Allocate a new stream
    //创建一个 `stream` 对象并设置状态为 `streamSYNReceived`
    stream := newStream(s, id, streamSYNReceived)
    ......
    // Check if we've exceeded the backlog
    select {
    // 就是这里把 stream 对象扔进 acceptCh 通道,
    // 然后由 AcceptStream() 去做后续处理,包括寻找用户指定的 handler 等,
    // AcceptStream 是随 Swarm.Listen 启动的,
    // 在 Connect 那篇中文档中可以看到
    case s.acceptCh <- stream:
        return nil
    default:
        // Backlog exceeded! RST the stream
        s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
        delete(s.streams, id)
        stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
        return s.sendNoWait(stream.sendHdr)
    }
}

在上文 FROM 端第一次调用 sendWindowUpdate 时有一个很重要的方法没有深入去说,那就是 Stream.sendFlags() 这个方法

func (s *Stream) sendFlags() uint16 {
    s.stateLock.Lock()
    defer s.stateLock.Unlock()
    var flags uint16
    switch s.state {
    case streamInit:
        flags |= flagSYN
        s.state = streamSYNSent
    case streamSYNReceived:
        flags |= flagACK
        s.state = streamEstablished
    }
    return flags
}

可以看出,FROMTO 都会调用这个方法,这里是改变状态并根据不同的状态返回不同的 header.flags 的逻辑

本文要关注的四个状态都出现在这里了,看过迁移图我们已经知道从 streamSYNReceived 迁移到 streamEstablishedTO 端的迁移逻辑,此时还没有发送 ACK 消息,走完 windowUpdate 逻辑后就完成了 ACK 的发送,所以我们还要找一下 ACK 消息是谁来处理的,其实前面看 handlerStreamMessage 时已经在方法中注视了 stream.incrSendWindow(hdr, flags) 的用途

// FROM 端的 state 是在这个方法中变成 streamEstablished 的
if err := stream.incrSendWindow(hdr, flags); err != nil {
......

// incrSendWindow updates the size of our send window
func (s *Stream) incrSendWindow(hdr header, flags uint16) error {
    if err := s.processFlags(flags); err != nil {
        return err
    }

    // Increase window, unblock a sender
    atomic.AddUint32(&s.sendWindow, hdr.Length())
    asyncNotify(s.sendNotifyCh)
    return nil
}

// processFlags is used to update the state of the stream
// based on set flags, if any. Lock must be held
func (s *Stream) processFlags(flags uint16) error {
    ......
    if flags&flagACK == flagACK {
        if s.state == streamSYNSent {
            s.state = streamEstablished
        }
        s.session.establishStream(s.id)
    }
    ......
}

这下本文关注的全部状态都已经找到了,FROM 端收到 TO 端发来的 type = updateWindowflags = ACK 的消息后,在 processFlags 方法中将本地的 state 变成 streamEstablished ,至此 FROMTO 成功的创建了 Stream 。

相关文章

网友评论

    本文标题:go-libp2p 之 NewStream 深层阅读笔记

    本文链接:https://www.haomeiwen.com/subject/nmwrcqtx.html