本项目地址:gof 一个支持百万连接的websocket框架
本文提及的内容包含在:conn.go
一、新连接内容接收的流程
1、通过epoll对象获取新连接
我们需要通过epoll对象的wait方法来获取到客户端的连接,在第一次获取的时候,其内容是通过GlobalFd传输的,只有我们获取到连接句柄,并加入到epoll对象中之后,以后才可以接收到该连接自己发送的消息。
2、解析头消息内容
websocket连接在获取到连接之后,其首先发送的一定是头信息,如果没有头信息,我们可以直接关闭socket连接,拒绝客户端的访问。
3、回应客户端头消息
接收到客户端的header头信息之后,我们应该对其进行解析,并判断该header是否有用,并返回相应的数据,从而建立连接,进而通过连接去收发消息。
二、接收句柄消息
我们通过实例化EpollObj对象,可以得到一个Epoll模型,然后我们通过其中的wait方法可以接收句柄消息。
下面我们说一下第一种情况,也就是如果Epoll模型给我们的消息内容是GlobalFd传送过来的,这时候我们的整个接收流程。也就是我们上一章节中提到的,eWait方法中的handle
1、判断是否有新连接加入
// @Author WangKan
// @Description //当wait方法取到内容后,会回调此方法,对fd进行处理
// @Date 2021/2/2 21:39
func (s *Server) handler(fd int, connType ConnStatus) {
switch connType {
case CONN_NEW:
newFd := s.addConn(fd)
//Upgrader to http header
s.handShaker(newFd)
//s.messageChan<-newFd
case CONN_MESSAGE:
Log.Info("接收到描述符为%v的消息", fd)
c, ok := s.conns.Load(fd)
if !ok {
Log.Info("描述符fd 为 %d 的s.conns 不存在!", fd)
return
}
s.receiveFdBytes <- c.(*Conn)
default:
panic("no connType")
}
}
我们只需要判断connType 就可以看到是新连接加入,还是之前的连接再次发送的消息。
如果是新连接的加入,我们首先应该做的处理就是从系统中读取到该连接的句柄,并取出其中的内容,进行解析。也就是 s.addConn方法。
// @Author WangKan
// @Description //如果有新的连接,就取出系统中的fd,添加到当前的conns中。
// @Date 2021/2/2 21:37
func (s *Server) addConn(fd int) (newFd int) {
newFd, _, err := syscall.Accept(fd)
fmt.Printf("系统描述符新建的链接:%+v\n", newFd)
if err != nil {
fmt.Println("accept err: ", err)
return
}
//设置fd为非阻塞
if err := syscall.SetNonblock(newFd, true); err != nil {
os.Exit(1)
}
//把这个链接加入到epoll中
s.ep.eAdd(newFd)
return
}
在该方法中,传入的参数是GlobalFd,这个时候首先我们应该调用syscall.Accept方法找到新加入的句柄,然后将该句柄设置为非阻塞模式,从而添加到epoll模型中去进行管理。
2、读取新连接中的Header
当我们解析到了新的连接句柄之后,我们需要对该句柄中的内容进行解析,首先我们应该获取其中的内容:
// utils.go
// @Author WangKan
// @Description //获取客户端发送的Header头
// @Date 2021/2/24 15:13
// @Param
// @return
func GetHeaderContent(fd int) (string,int) {
for{
var buf [1024]byte
nbytes, _ := syscall.Read(fd, buf[:])
if nbytes > 0 {
return string(buf[:]),nbytes
}
}
}
通过调用syscall.Read()方法,我们可以将连接中的内容取出,取出之后是一个[]byte类型,然后我们转换为string,并将该内容返回给调用方。
3、为Header进行解析
在获取到Header头之后,我们需要解析header头的内容。在本项目中,是先将Header信息变成一个Map,然后遍历该map进行判断的。
具体代码为:
// @Author WangKan
// @Description //将Header头转换为map
// @Date 2021/2/24 15:13
// @Param
// @return
func FormatHeader(buf string,length int)(map[string]string){
var header =make(map[string]string)
str:=""
index:=0
for i:=0;i<length;i++{
if buf[i]==13{
if index == 0 {
arr:=strings.Split(str," ")
fmt.Println(arr)
header["Method"]=arr[0]
str=""
index++
continue
}
if str != ""{
arr:=strings.Split(str,": ")
header[arr[0]]=arr[1]
str=""
}
}else if buf[i]==10{
continue
}else{
str += string(buf[i])
}
}
return header
}
通过上述的代码,我们最终返回给调用方一个字符串类型的map,然后,我们对其中的关键信息进行判断,判断的代码来源于:gorilla/websocket/server.go
func (u *Upgrader) Upgrade(fd int, header map[string]string, s *Server) (*Conn, error) {
const badHandshake = "websocket: the client is not using the websocket protocol: "
if header["Connection"] != "Upgrade" && header["Connection"] != "upgrade" {
return u.returnError(http.StatusBadRequest, badHandshake+"'upgrade' token not found in 'Connection' header")
}
if header["Upgrade"] != "websocket" {
return u.returnError(http.StatusBadRequest, badHandshake+"'websocket' token not found in 'Upgrade' header")
}
if header["Method"] != "GET" {
return u.returnError(http.StatusMethodNotAllowed, badHandshake+"request method is not GET")
}
if header["Sec-WebSocket-Version"] != "13" {
return u.returnError(http.StatusBadRequest, "websocket: unsupported version: 13 not found in 'Sec-Websocket-Version' header")
}
//if _, ok := header["Sec-WebSocket-Extensions"]; ok {
// return u.returnError(http.StatusInternalServerError, "websocket: application specific 'Sec-WebSocket-Extensions' headers are unsupported")
//}
challengeKey := header["Sec-WebSocket-Key"]
if challengeKey == "" {
return u.returnError(http.StatusBadRequest, "websocket: not a websocket handshake: 'Sec-WebSocket-Key' header is missing or blank")
}
c := newConn(fd, s)
// Use larger of hijacked buffer and connection write buffer for header.
wf := []byte{}
wf = append(wf, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
wf = append(wf, computeAcceptKey(challengeKey)...)
wf = append(wf, "\r\n"...)
wf = append(wf, "\r\n"...)
c.handShake <- Message{
MessageType: -1,
Content: wf,
}
return c, nil
}
其中精简了一部分代码,是由于一些配置项我们还没有用到。
通过这个方法,我们最终返回的是wf,就是需要传送给客户端的头信息。
然后我们通过调用 syscall.Write()方法,就可以将这些内容写入对应的客户端句柄中,系统会将内容自动推送给客户端。
//server.go
n, err := syscall.Write(fd, heade.Content)
网友评论