p2p理财系统源码(IMT星际源码-20181212)
go-libp2p 之 NewStream 深层阅读笔记
Stream 的 Header 协议
格式
- Version (8 bits)
- Type (8 bits)
- Flags (16 bits)
- StreamID (32 bits)
- Length (32 bits)
当前版本中 vsn 始终等于 0
Type
- 0x0 Data : 用于数据传输,其中 length 代表 payload 的长度
- 0x1 WindowUpdate : 用来改变指定 Stream 的 recvWindow 尺寸,length 标示窗口的增量更新值
- 0x2 Ping : 用作心跳保持长连接或者采集 RTT度量值,在响应中要回复 StreamID 和 Length 属性
- 0x3 GoAway : 用来终止会话,此时 StreamID = 0 而 Length 标示ErrorCode
Flags
- 0x1 SYN : 创建Stream
- 0x2 ACK : 用于确认SYN消息
- 0x4 FIN : 执行 Stream 的半关闭
- 0x8 RST : 立即重置给定的 Stream
从命名为 flags 和 value 的取值可以看出这里的 flag 是可以叠加的
换成二进制 1 = 0001 , 2 = 0010 , 4 = 0100 , 8 = 1000
叠加时可以用 OR 操作
例如 flags = SYN | ACK ,此时 flags = 3,3 没有出现在协议列表里,但是 3 == 1 | 2
NewStream & OpenStream
- 在应用程序中通过 Host 接口的实现类 BasicHost.NewStream 方法在创建一个 stream 通道;
- 通过 Network 接口的实现类 Swarm.NewStream 进一步创建 stream;
- 跟 p 没有长链接时去创建连接,成功后会返回一个 swarm 中定义的 Conn 对象;
- 经过一系列调用请求落到 Transport 接口的 Dial 上,我们通常用 tcp 的实现来写应用;
- 这一步只是为了说明这个连接是在 Transport 中调用的标准库 net.Dialer 来创建的;
- 返回的 Swarm.Conn 包含了 streammux.Conn 接口的实现,OpenStream 定义在这个接口中;
- 前面封装的挺复杂,但是从这才算真正的开始,这里会在当前的 Conn 上返回我们想要的 Stream;
- OpenStream 生成了 id = streamID 并且创建了 yamux.Stream 对象,还发送了 windowUpdate 协议;
- newStream 中制定了几个很关键的参数,包括 headerSize = 12byte,初始的流窗口尺寸 initialStreamWindow = 256k ;
- sendWindowUpdate 负责向被连接端发送一个 header=[]byte{0, windowUpdate, SYN, id, delta} 消息,SYN 标示创建一个新的 Stream 并且设置初始数据窗口。
时序图中概括了创建 Stream 的过程,只列出了关键路径,实际代码比这个复杂很多,不太关心此部分的抽象设计故此忽略了对于接口的描述,直接关注实现,而且貌似只有最后一步最重要
关于 streamID 的计算
- yamux/session.go
代码很短,可以看出 id 就是自增的,但为什么每次都是 id = id+2 呢?这就必须要知道 nextStreamID 的初始值了,在 newSession 时指定了 if client { id = 1 } else { id = 2 },这就可以看出每个奇数 id 都代表了 client ,而偶数 id 则代表了 server 端。
func (s *Session) OpenStream() (*Stream, error) {......GET_ID: // Get an ID, and check for stream exhaustion id := atomic.LoadUint32(&s.nextStreamID) if id >= math.MaxUint32-1 { return nil, ErrStreamsExhausted } if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { goto GET_ID }......}
为了缩短篇幅,前面省略了 newSession 的逻辑,因为的确没太多看的价值,简单来说是 Transport.NewConn(nc net.Conn, isServer bool) 时指定了一个 isServer 参数,这个参数在后面 newSession 时会用来决定 client == true / false 进而决定 id 的初始值。我们可以认为 id 的初始值一直是 1,因为只有本地的 peerID 是空时,isServer 才是 true,p2p 环境下就当这种情况不存在吧。
关于 delta 的计算
- yamux/stream.go
我们看到在 Type = WindowUpdate 时对 delta 有个描述,它是用来增量更新窗口尺寸的,这个代码有点长,顺着注视来读吧
// sendWindowUpdate potentially sends a window update enabling// further writes to take place. Must be invoked with the lock.func (s *Stream) sendWindowUpdate() error { s.controlHdrLock.Lock() defer s.controlHdrLock.Unlock() // Determine the delta update // 默认 MaxStreamWindowSize == 256k ,可以在创建 yamux 时重新指定大小 max := s.session.config.MaxStreamWindowSize s.recvLock.Lock() // recvBuf 是在 readData 时通过 Grow 函数来重新分配的,此时还没有分配 ,recvWindow 的在 newStream 时给了默认值 256k // 所以这里应该是 delta = (256k - 0) - 256k = 0 delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow // Determine the flags if any flags := s.sendFlags() // Check if we can omit the update if delta < (max/2) && flags == 0 { s.recvLock.Unlock() return nil } // Update our window // 增量更新 recvWindow 窗口,在 readData 后会减小窗口,是不是很熟悉?滑动窗口 s.recvWindow += delta s.recvLock.Unlock() // Send the header s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr, nil); err != nil { return err } return nil}
至此我们创建一个 Stream 并且通知对端打开一个窗口准备接收数据,本地也设定了初始的窗口大小,接下来看看对端是如何处理我们发过去的 header 的。
recvLoop
时序图没有画出 accept 的逻辑, handlerIncoming 的调用是异步的,并不会阻塞在此处。如果对 accept 和 handlerIncoming 之前的调用逻辑感兴趣,可参考 《go-libp2p-host Connect 源码分析》 章节,此处不再深入,仅概括了关键方法的调用过程,知道 handlerIncoming 之后,会沿着这个路径调用到 recvLoop() 即可
- yamux/session.go
// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type// 每种 type 都有配置指定的 handlervar ( handlers = []func(*Session, header) error{ typeData: (*Session).handleStreamMessage, typeWindowUpdate: (*Session).handleStreamMessage, typePing: (*Session).handlePing, typeGoAway: (*Session).handleGoAway, })// recvLoop continues to receive data until a fatal error is encounteredfunc (s *Session) recvLoop() error { defer close(s.recvDoneCh) // 这里创建了 12字节的 header buffer // 这就说明在 stream 上收到每个请求开头的 12 个字节都是一个 header hdr := header(make([]byte, headerSize)) for { // Read the header if _, err := io.ReadFull(s.reader, hdr); err != nil { if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) } return err } // 上面去读 header 并且进行了检查,是不是符合规则,我们再回忆一下 header 的格式 // |<- vsn(1) ->|<- type(1) ->|<- flags(2) ->|<- streamID(4) ->|<- length(4) ->| // Verify the version if hdr.Version() != protoVersion { s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) return ErrInvalidVersion } // 校验 type 的范围 mt := hdr.MsgType() if mt < typeData || mt > typeGoAway { return ErrInvalidMsgType } //我们要观察的是 typeWindowUpdate ,这个在 handlers 中的序号是 1,所以这个类型的处理函数是 (*Session).handleStreamMessage if err := handlers[mt](s, hdr); err != nil { return err } }}
只要 BasicHost.NewHost 成功之后,本地就会启动 recvLoop,所有从外面来的 stream packet 都会通过 recvLoop 调度
每个 packect 都会按照 header 中的 type 属性分配给不同的 handler 进行处理。
handleStreamMessage
...... typeData: (*Session).handleStreamMessage, typeWindowUpdate: (*Session).handleStreamMessage,......
我们最关心的两个 type 都是由 handleStreamMessage 来处理的,所以直接去读这个方法的代码即可。阅读前先来了解一下 Stream 的状态迁移,因为是两端进行 TCP 连接,我们就用 FROM 和 TO 来代表这两端,前面的部分讲的都是 FROM 端的逻辑,已经说到 FROM 向 TO 发送了 SYN 报文, 此时 TO 再回复一个 ACK 报文即可成功开启一个 Stream , stream 这几了一系列状态来控制交互,具体如下表:
StateValuestreamInit0streamSYNSent1streamSYNReceived2streamEstablished3streamLocalClose4streamRemoteClose5streamClosed6streamReset7
创建一个 Stream 时,FROM 和 TO 会按照下图来进行状态迁移
上图并不是完整的状态迁移图,只画到成功创建连接并没有描述异常时的各种 Reset / Close 过程,但是已经足够协助我们搞清楚 handleStreamMessage 的逻辑了
// ================// yamux/session.go// ================// handleStreamMessage handles either a data or window update framefunc (s *Session) handleStreamMessage(hdr header) error { // Check for a new stream creation id := hdr.StreamID() flags := hdr.Flags() // TO 端第一次收到 FROM 端消息时,这个条件会成立 if flags&flagSYN == flagSYN { // 下文单独介绍这个方法的作用 if err := s.incomingStream(id); err != nil { return err } } // Get the stream s.streamLock.Lock() stream := s.streams[id] s.streamLock.Unlock() // If we do not have a stream, likely we sent a RST if stream == nil { // Drain any data on the wire if hdr.MsgType() == typeData && hdr.Length() > 0 { s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) if _, err := io.CopyN(ioutil.Discard, s.reader, int64(hdr.Length())); err != nil { s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) return nil } } else { s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) } return nil } // Check if this is a window update if hdr.MsgType() == typeWindowUpdate { // FROM 端的 state 是在这个方法中变成 streamEstablished 的 if err := stream.incrSendWindow(hdr, flags); err != nil { if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) } return err } return nil } // Read the new data if err := stream.readData(hdr, flags, s.reader); err != nil { if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) } return err } return nil}
incomingStream这个方法只有 FROM 向 TO 发送 SYN 消息时才会被 TO 端调用,他要做的是在本地创建一个 stream 对象并设置状态为 streamSYNReceived,然后通过 acceptCh 通道去调用 sendWindowUpdate 方法来回复 ACK 消息给 FROM 端
// ================// yamux/session.go// ================// incomingStream is used to create a new incoming streamfunc (s *Session) incomingStream(id uint32) error { ...... // Allocate a new stream //创建一个 `stream` 对象并设置状态为 `streamSYNReceived` stream := newStream(s, id, streamSYNReceived) ...... // Check if we've exceeded the backlog select { // 就是这里把 stream 对象扔进 acceptCh 通道, // 然后由 AcceptStream() 去做后续处理,包括寻找用户指定的 handler 等, // AcceptStream 是随 Swarm.Listen 启动的, // 在 Connect 那篇中文档中可以看到 case s.acceptCh <- stream: return nil default: // Backlog exceeded! RST the stream s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") delete(s.streams, id) stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) return s.sendNoWait(stream.sendHdr) }}
在上文 FROM 端第一次调用 sendWindowUpdate 时有一个很重要的方法没有深入去说,那就是 Stream.sendFlags() 这个方法
func (s *Stream) sendFlags() uint16 { s.stateLock.Lock() defer s.stateLock.Unlock() var flags uint16 switch s.state { case streamInit: flags |= flagSYN s.state = streamSYNSent case streamSYNReceived: flags |= flagACK s.state = streamEstablished } return flags}
可以看出,FROM 和 TO 都会调用这个方法,这里是改变状态并根据不同的状态返回不同的 header.flags 的逻辑
本文要关注的四个状态都出现在这里了,看过迁移图我们已经知道从 streamSYNReceived 迁移到 streamEstablished 是 TO 端的迁移逻辑,此时还没有发送 ACK 消息,走完 windowUpdate 逻辑后就完成了 ACK 的发送,所以我们还要找一下 ACK 消息是谁来处理的,其实前面看 handlerStreamMessage 时已经在方法中注视了 stream.incrSendWindow(hdr, flags) 的用途
// FROM 端的 state 是在这个方法中变成 streamEstablished 的
if err := stream.incrSendWindow(hdr, flags); err != nil {
......
// incrSendWindow updates the size of our send windowfunc (s *Stream) incrSendWindow(hdr header, flags uint16) error { if err := s.processFlags(flags); err != nil { return err } // Increase window, unblock a sender atomic.AddUint32(&s.sendWindow, hdr.Length()) asyncNotify(s.sendNotifyCh) return nil}// processFlags is used to update the state of the stream// based on set flags, if any. Lock must be heldfunc (s *Stream) processFlags(flags uint16) error { ...... if flags&flagACK == flagACK { if s.state == streamSYNSent { s.state = streamEstablished } s.session.establishStream(s.id) } ......}
这下本文关注的全部状态都已经找到了,FROM 端收到 TO 端发来的 type = updateWindow , flags = ACK 的消息后,在 processFlags 方法中将本地的 state 变成 streamEstablished ,至此 FROM 和 TO 成功的创建了 Stream 。
原文链接: https://www.jianshu.com/p/14781d900501
-
阿里国际站(“死守”阿里巴巴国际站)
2024-05-26
-
淮海经济区(淮海经济区三个主要经济城市数据,徐州规模大,济宁扎实临沂活跃)
2024-05-26
-
吴江市(12个版聚焦吴江经验,全国分享)
2024-05-26
-
书法学习(掌握深入临帖的关键,学书法便可豁然开朗)
2024-05-26
-
布政(清代此省份是个特例,不仅有两个布政使,而且还有两个省会)
2024-05-26
-
519是什么意思(你知道中国旅游日为什么定在“5·19”吗?)
2024-05-26