美文网首页
IMT星际源码 - 20181212

IMT星际源码 - 20181212

作者: IMTOfficial | 来源:发表于2018-12-12 16:45 被阅读0次
    星际源码20181030.jpeg

    go-libp2p 之 NewStream 深层阅读笔记

    Stream 的 Header 协议

    格式

    • Version (8 bits)
    • Type (8 bits)
    • Flags (16 bits)
    • StreamID (32 bits)
    • Length (32 bits)

    当前版本中 vsn 始终等于 0

    Type

    • 0x0 Data : 用于数据传输,其中 length 代表 payload 的长度
    • 0x1 WindowUpdate : 用来改变指定 Stream 的 recvWindow 尺寸,length 标示窗口的增量更新值
    • 0x2 Ping : 用作心跳保持长连接或者采集 RTT度量值,在响应中要回复 StreamID 和 Length 属性
    • 0x3 GoAway : 用来终止会话,此时 StreamID = 0 而 Length 标示ErrorCode

    Flags

    • 0x1 SYN : 创建Stream
    • 0x2 ACK : 用于确认SYN消息
    • 0x4 FIN : 执行 Stream 的半关闭
    • 0x8 RST : 立即重置给定的 Stream

    从命名为 flags 和 value 的取值可以看出这里的 flag 是可以叠加的
    换成二进制 1 = 0001 , 2 = 0010 , 4 = 0100 , 8 = 1000
    叠加时可以用 OR操作
    例如 flags = SYN | ACK,此时 flags = 3,3 没有出现在协议列表里,但是 3 == 1 | 2

    NewStream & OpenStream

    1.jpg
    1. 在应用程序中通过 Host 接口的实现类 BasicHost.NewStream 方法在创建一个 stream 通道;
    2. 通过 Network 接口的实现类 Swarm.NewStream 进一步创建 stream;
    3. 跟 p 没有长链接时去创建连接,成功后会返回一个 swarm 中定义的 Conn 对象;
    4. 经过一系列调用请求落到 Transport 接口的 Dial 上,我们通常用 tcp 的实现来写应用;
    5. 这一步只是为了说明这个连接是在 Transport 中调用的标准库 net.Dialer 来创建的;
    6. 返回的 Swarm.Conn 包含了 streammux.Conn 接口的实现,OpenStream 定义在这个接口中;
    7. 前面封装的挺复杂,但是从这才算真正的开始,这里会在当前的 Conn 上返回我们想要的 Stream;
    8. OpenStream 生成了 id = streamID并且创建了 yamux.Stream 对象,还发送了 windowUpdate 协议;
    9. newStream 中制定了几个很关键的参数,包括 headerSize = 12byte,初始的流窗口尺寸 initialStreamWindow = 256k ;
    10. sendWindowUpdate 负责向被连接端发送一个 header=[]byte{0, windowUpdate, SYN, id, delta}消息,SYN 标示创建一个新的 Stream 并且设置初始数据窗口。

    时序图中概括了创建 Stream 的过程,只列出了关键路径,实际代码比这个复杂很多,不太关心此部分的抽象设计故此忽略了对于接口的描述,直接关注实现,而且貌似只有最后一步最重要

    关于 streamID 的计算

    • yamux/session.go

    代码很短,可以看出 id 就是自增的,但为什么每次都是 id = id+2呢?这就必须要知道 nextStreamID的初始值了,在 newSession 时指定了 if client { id = 1 } else { id = 2 },这就可以看出每个奇数 id 都代表了 client ,而偶数 id 则代表了 server 端。

    func (s *Session) OpenStream() (*Stream, error) {
    ......
    GET_ID:
        // Get an ID, and check for stream exhaustion
        id := atomic.LoadUint32(&s.nextStreamID)
        if id >= math.MaxUint32-1 {
            return nil, ErrStreamsExhausted
        }
        if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
            goto GET_ID
        }
    ......
    }
    
    

    为了缩短篇幅,前面省略了 newSession 的逻辑,因为的确没太多看的价值,简单来说是 Transport.NewConn(nc net.Conn, isServer bool)时指定了一个 isServer参数,这个参数在后面 newSession 时会用来决定 client == true / false进而决定 id 的初始值。我们可以认为 id 的初始值一直是 1,因为只有本地的 peerID 是空时,isServer 才是 true,p2p 环境下就当这种情况不存在吧。

    关于 delta 的计算

    • yamux/stream.go

    我们看到在 Type = WindowUpdate 时对 delta 有个描述,它是用来增量更新窗口尺寸的,这个代码有点长,顺着注视来读吧

    // sendWindowUpdate potentially sends a window update enabling
    // further writes to take place. Must be invoked with the lock.
    func (s *Stream) sendWindowUpdate() error {
        s.controlHdrLock.Lock()
        defer s.controlHdrLock.Unlock()
    
        // Determine the delta update
        // 默认 MaxStreamWindowSize == 256k ,可以在创建 yamux 时重新指定大小
        max := s.session.config.MaxStreamWindowSize
        s.recvLock.Lock()
        // recvBuf 是在 readData 时通过 Grow 函数来重新分配的,此时还没有分配 ,recvWindow 的在 newStream 时给了默认值 256k
        // 所以这里应该是 delta = (256k - 0) - 256k = 0 
        delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow
    
        // Determine the flags if any
        flags := s.sendFlags()
    
        // Check if we can omit the update
        if delta < (max/2) && flags == 0 {
            s.recvLock.Unlock()
            return nil
        }
    
        // Update our window
        // 增量更新 recvWindow 窗口,在 readData 后会减小窗口,是不是很熟悉?滑动窗口
        s.recvWindow += delta
        s.recvLock.Unlock()
    
        // Send the header
        s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
        if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr, nil); err != nil {
            return err
        }
        return nil
    }
    
    

    至此我们创建一个 Stream 并且通知对端打开一个窗口准备接收数据,本地也设定了初始的窗口大小,接下来看看对端是如何处理我们发过去的 header 的。

    recvLoop

    libp2p Listen.jpg

    时序图没有画出 accept 的逻辑, handlerIncoming的调用是异步的,并不会阻塞在此处。如果对 accepthandlerIncoming之前的调用逻辑感兴趣,可参考 《go-libp2p-host Connect 源码分析》章节,此处不再深入,仅概括了关键方法的调用过程,知道 handlerIncoming之后,会沿着这个路径调用到 recvLoop()即可

    • yamux/session.go
    // Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
    // 每种 type 都有配置指定的 handler
    var (
        handlers = []func(*Session, header) error{
            typeData:         (*Session).handleStreamMessage,
            typeWindowUpdate: (*Session).handleStreamMessage,
            typePing:         (*Session).handlePing,
            typeGoAway:       (*Session).handleGoAway,
        }
    )
    
    // recvLoop continues to receive data until a fatal error is encountered
    func (s *Session) recvLoop() error {
        defer close(s.recvDoneCh)
        // 这里创建了 12字节的 header buffer
        // 这就说明在 stream 上收到每个请求开头的 12 个字节都是一个 header
        hdr := header(make([]byte, headerSize))
        for {
            // Read the header
            if _, err := io.ReadFull(s.reader, hdr); err != nil {
                if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
                    s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
                }
                return err
            }
            // 上面去读 header 并且进行了检查,是不是符合规则,我们再回忆一下 header 的格式
            // |<- vsn(1) ->|<- type(1) ->|<- flags(2) ->|<- streamID(4) ->|<- length(4) ->|
            // Verify the version
            if hdr.Version() != protoVersion {
                s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
                return ErrInvalidVersion
            }
            // 校验 type 的范围
            mt := hdr.MsgType()
            if mt < typeData || mt > typeGoAway {
                return ErrInvalidMsgType
            }
            //我们要观察的是 typeWindowUpdate ,这个在 handlers 中的序号是 1,所以这个类型的处理函数是 (*Session).handleStreamMessage
            if err := handlers[mt](s, hdr); err != nil {
                return err
            }
        }
    }
    
    

    只要 BasicHost.NewHost成功之后,本地就会启动 recvLoop,所有从外面来的 stream packet 都会通过 recvLoop调度

    每个 packect 都会按照 header中的 type属性分配给不同的 handler进行处理。

    handleStreamMessage

    ......
            typeData:         (*Session).handleStreamMessage,
            typeWindowUpdate: (*Session).handleStreamMessage,
    ......
    
    

    我们最关心的两个 type都是由 handleStreamMessage来处理的,所以直接去读这个方法的代码即可。阅读前先来了解一下 Stream的状态迁移,因为是两端进行 TCP连接,我们就用 FROMTO来代表这两端,前面的部分讲的都是 FROM端的逻辑,已经说到 FROMTO发送了 SYN报文, 此时 TO再回复一个 ACK报文即可成功开启一个 Streamstream的一系列状态来控制交互,具体如下表:

    State Value
    streamInit 0
    streamSYNSent 1
    streamSYNReceived 2
    streamEstablished 3
    streamLocalClose 4
    streamRemoteClose 5
    streamClosed 6
    streamReset 7

    创建一个 Stream时,FROMTO会按照下图来进行状态迁移

    state.jpg

    上图并不是完整的状态迁移图,只画到成功创建连接并没有描述异常时的各种 Reset / Close过程,但是已经足够协助我们搞清楚 handleStreamMessage的逻辑了

    // ================
    // yamux/session.go
    // ================
    // handleStreamMessage handles either a data or window update frame
    func (s *Session) handleStreamMessage(hdr header) error {
        // Check for a new stream creation
        id := hdr.StreamID()
        flags := hdr.Flags()
        // TO 端第一次收到 FROM 端消息时,这个条件会成立
        if flags&flagSYN == flagSYN {
            // 下文单独介绍这个方法的作用
            if err := s.incomingStream(id); err != nil {
                return err
            }
        }
    
        // Get the stream
        s.streamLock.Lock()
        stream := s.streams[id]
        s.streamLock.Unlock()
    
        // If we do not have a stream, likely we sent a RST
        if stream == nil {
            // Drain any data on the wire
            if hdr.MsgType() == typeData && hdr.Length() > 0 {
                s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
                if _, err := io.CopyN(ioutil.Discard, s.reader, int64(hdr.Length())); err != nil {
                    s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
                    return nil
                }
            } else {
                s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
            }
            return nil
        }
    
        // Check if this is a window update
        if hdr.MsgType() == typeWindowUpdate {
                // FROM 端的 state 是在这个方法中变成 streamEstablished 的
            if err := stream.incrSendWindow(hdr, flags); err != nil {
                if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
                    s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
                }
                return err
            }
            return nil
        }
    
        // Read the new data
        if err := stream.readData(hdr, flags, s.reader); err != nil {
            if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
                s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
            }
            return err
        }
        return nil
    }
    
    

    incomingStream这个方法只有 FROMTO发送 SYN消息时才会被 TO端调用,他要做的是在本地创建一个 stream对象并设置状态为 streamSYNReceived,然后通过 acceptCh通道去调用 sendWindowUpdate方法来回复 ACK消息给 FROM

    // ================
    // yamux/session.go
    // ================
    // incomingStream is used to create a new incoming stream
    func (s *Session) incomingStream(id uint32) error {
        ......
        // Allocate a new stream
        //创建一个 `stream` 对象并设置状态为 `streamSYNReceived`
        stream := newStream(s, id, streamSYNReceived)
        ......
        // Check if we've exceeded the backlog
        select {
        // 就是这里把 stream 对象扔进 acceptCh 通道,
        // 然后由 AcceptStream() 去做后续处理,包括寻找用户指定的 handler 等,
        // AcceptStream 是随 Swarm.Listen 启动的,
        // 在 Connect 那篇中文档中可以看到
        case s.acceptCh <- stream:
            return nil
        default:
            // Backlog exceeded! RST the stream
            s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
            delete(s.streams, id)
            stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
            return s.sendNoWait(stream.sendHdr)
        }
    }
    
    

    在上文 FROM 端第一次调用 sendWindowUpdate时有一个很重要的方法没有深入去说,那就是 Stream.sendFlags()这个方法

    func (s *Stream) sendFlags() uint16 {
        s.stateLock.Lock()
        defer s.stateLock.Unlock()
        var flags uint16
        switch s.state {
        case streamInit:
            flags |= flagSYN
            s.state = streamSYNSent
        case streamSYNReceived:
            flags |= flagACK
            s.state = streamEstablished
        }
        return flags
    }
    
    

    可以看出,FROMTO都会调用这个方法,这里是改变状态并根据不同的状态返回不同的 header.flags 的逻辑

    本文要关注的四个状态都出现在这里了,看过迁移图我们已经知道从 streamSYNReceived迁移到 streamEstablishedTO端的迁移逻辑,此时还没有发送 ACK 消息,走完 windowUpdate逻辑后就完成了 ACK的发送,所以我们还要找一下 ACK消息是谁来处理的,其实前面看 handlerStreamMessage时已经在方法中注视了 stream.incrSendWindow(hdr, flags)的用途

    // FROM 端的 state 是在这个方法中变成 streamEstablished 的
    if err := stream.incrSendWindow(hdr, flags); err != nil {
    ......

    // incrSendWindow updates the size of our send window
    func (s *Stream) incrSendWindow(hdr header, flags uint16) error {
        if err := s.processFlags(flags); err != nil {
            return err
        }
    
        // Increase window, unblock a sender
        atomic.AddUint32(&s.sendWindow, hdr.Length())
        asyncNotify(s.sendNotifyCh)
        return nil
    }
    
    // processFlags is used to update the state of the stream
    // based on set flags, if any. Lock must be held
    func (s *Stream) processFlags(flags uint16) error {
        ......
        if flags&flagACK == flagACK {
            if s.state == streamSYNSent {
                s.state = streamEstablished
            }
            s.session.establishStream(s.id)
        }
        ......
    }
    
    

    这下本文关注的全部状态都已经找到了,FROM端收到 TO端发来的 type = updateWindowflags = ACK的消息后,在 processFlags方法中将本地的 state变成 streamEstablished,至此 FROMTO成功的创建了 Stream 。

    原文链接:https://www.jianshu.com/p/14781d900501

    相关文章

      网友评论

          本文标题:IMT星际源码 - 20181212

          本文链接:https://www.haomeiwen.com/subject/vsrphqtx.html