理论知识可以参考
网络信息怎么在网线中传播的 (转载自知乎)
Android 网络(一) 概念 TCP/IP Socket Http Restful
脑残式网络编程入门(一):跟着动画来学TCP三次握手和四次挥手
脑残式网络编程入门(二):我们在读写Socket时,究竟在读写什么?
TCP 粘包问题浅析及其解决方案,这个帖子里大家一顿喷粘包这个叫法
我工作五年的时候也不知道 “TCP 粘包”,继续吐槽
一、API
1.服务端通过Listen加Accept
package main
import (
"fmt"
"net"
"os"
"time"
)
func main() {
//通过 ResolveTCPAddr 获取一个 TCPAddr
//ResolveTCPAddr(net, addr string) (*TCPAddr, os.Error)
//net参数是"tcp4"、"tcp6"、"tcp"中的任意一个,
//分别表示 TCP(IPv4-only),TCP(IPv6-only)
//或者 TCP(IPv4,IPv6 的任意一个)
//addr 表示域名或者IP地址,
//例如"www.google.com:80" 或者"127.0.0.1:22".
service := ":7777"
tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
checkError(err)
//ListenTCP(net string, laddr *TCPAddr) (l *TCPListener, err os.Error)
listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)
//func (l *TCPListener) Accept() (c Conn, err os.Error)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
daytime := time.Now().String()
// don't care about return value
conn.Write([]byte(daytime))
// we're finished with this client
conn.Close()
}
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
上面的服务跑起来之后,它将会一直在那里等待,直到有新的客户端请求到达。当有新的客户端请求到达并同意接受 Accept 该请求的时候他会反馈当前的时间信息。值得注意的是,在代码中 for 循环里,当有错误发生时,直接 continue而不是退出,是因为在服务器端跑代码的时候,当有错误发生的情况下最好是由服务端记录错误,然后当前连接的客户端直接报错而退出,从而不会影响到当前服务端运行的整个服务。
上面的代码有个缺点,执行的时候是单任务的,不能同时接收多个请求,那么该如何改造以使它支持多并发呢?
...
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go handlerClient(conn)
}
...
func handleClient(conn net.Conn) {
defer conn.Close()
daytime := time.Now().String()
// don't care about return value
conn.Write([]byte(daytime))
// we're finished with this client
}
...
2.客户端直接调用 Dial
package main
import (
"fmt"
"io/ioutil"
"net"
"os"
)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s host:port ", os.Args[0])
os.Exit(1)
}
service := os.Args[1]
tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
checkError(err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkError(err)
_, err = conn.Write([]byte("HEAD / HTTP/1.0\r\n\r\n"))
checkError(err)
result, err := ioutil.ReadAll(conn)
checkError(err)
fmt.Println(string(result))
os.Exit(0)
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
首先程序将用户的输入作为参数 service 传入net.ResolveTCPAddr 获取一个 tcpAddr,然后把 tcpAddr 传入 DialTCP 后创建了一个 TCP连接 conn ,通过 conn 来发送请求信息,最后通过 ioutil.ReadAll 从 conn 中读取全部的文本,也就是服务端响应反馈的信息。
二、实现一个可以接受不同命令的服务端
参考使用 Go 进行 Socket 编程
我们实现一个服务端, 它可以接受下面这些命令:
- ping 探活的命令, 服务端会返回 “pong”
- echo 服务端会返回收到的字符串
- quit 服务端收到这个命令后就会关闭连接
具体的服务端代码如下所示:
package main
import (
"fmt"
"net"
"strings"
)
func connHandler(c net.Conn) {
if c == nil {
return
}
buf := make([]byte, 4096)
for {
cnt, err := c.Read(buf)
if err != nil || cnt == 0 {
c.Close()
break
}
inStr := strings.TrimSpace(string(buf[0:cnt]))
inputs := strings.Split(inStr, " ")
switch inputs[0] {
case "ping":
c.Write([]byte("pong\n"))
case "echo":
echoStr := strings.Join(inputs[1:], " ") + "\n"
c.Write([]byte(echoStr))
case "quit":
c.Close()
break
default:
fmt.Printf("Unsupported command: %s\n", inputs[0])
}
}
fmt.Printf("Connection from %v closed. \n", c.RemoteAddr())
}
func main() {
server, err := net.Listen("tcp", ":1208")
if err != nil {
fmt.Printf("Fail to start server, %s\n", err)
}
fmt.Println("Server Started ...")
for {
conn, err := server.Accept()
if err != nil {
fmt.Printf("Fail to connect, %s\n", err)
break
}
go connHandler(conn)
}
}
客户端的实现
package main
import (
"bufio"
"fmt"
"net"
"os"
"strings"
)
func connHandler(c net.Conn) {
defer c.Close()
reader := bufio.NewReader(os.Stdin)
buf := make([]byte, 1024)
for {
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
if input == "quit" {
return
}
c.Write([]byte(input))
cnt, err := c.Read(buf)
if err != nil {
fmt.Printf("Fail to read data, %s\n", err)
continue
}
fmt.Print(string(buf[0:cnt]))
}
}
func main() {
conn, err := net.Dial("tcp", "localhost:1208")
if err != nil {
fmt.Printf("Fail to connect, %s\n", err)
return
}
connHandler(conn)
}
三、解决golang开发socket服务时粘包半包bug
基础知识可以参考tcp是流的一些思考--拆包和粘包
tcp中有一个negal算法,用途是这样的:通信两端有很多小的数据包要发送,虽然传送的数据很少,但是流程一点没少,也需要tcp的各种确认,校验。这样小的数据包如果很多,会造成网络资源很大的浪费,negal算法做了这样一件事,当来了一个很小的数据包,我不急于发送这个包,而是等来了更多的包,将这些小包组合成大包之后一并发送,不就提高了网络传输的效率的嘛。这个想法收到了很好的效果,但是我们想一下,如果是分属于两个不同页面的包,被合并在了一起,那客户那边如何区分它们呢?
这就是粘包问题。从粘包问题我们更可以看出为什么tcp被称为流协议,因为它就跟水流一样,是没有边界的,没有消息的边界保护机制,所以tcp只有流的概念,没有包的概念。
解决tcp粘包的方法:
客户端会定义一个标示,比如数据的前4位是数据的长度,后面才是数据。那么客户端只需发送 ( 数据长度+数据 ) 的格式数据就可以了,接收方根据包头信息里的数据长度读取buffer.
客户端:
//客户端发送封包
package main
import (
"fmt"
"math/rand"
"net"
"os"
"strconv"
"strings"
"time"
)
func main() {
server := "127.0.0.1:5000"
tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
defer conn.Close()
for i := 0; i < 50; i++ {
//msg := strconv.Itoa(i)
msg := RandString(i)
msgLen := fmt.Sprintf("%03s", strconv.Itoa(len(msg)))
//fmt.Println(msg, msgLen)
words := "aaaa" + msgLen + msg
//words := append([]byte("aaaa"), []byte(msgLen), []byte(msg))
fmt.Println(len(words), words)
conn.Write([]byte(words))
}
}
/**
*生成随机字符
**/
func RandString(length int) string {
rand.Seed(time.Now().UnixNano())
rs := make([]string, length)
for start := 0; start < length; start++ {
t := rand.Intn(3)
if t == 0 {
rs = append(rs, strconv.Itoa(rand.Intn(10)))
} else if t == 1 {
rs = append(rs, string(rand.Intn(26)+65))
} else {
rs = append(rs, string(rand.Intn(26)+97))
}
}
return strings.Join(rs, "")
}
服务端实例代码:
package main
import (
"fmt"
"io"
"net"
"os"
"strconv"
)
func main() {
netListen, err := net.Listen("tcp", ":5000")
CheckError(err)
defer netListen.Close()
for {
conn, err := netListen.Accept()
if err != nil {
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
allbuf := make([]byte, 0)
buffer := make([]byte, 1024)
for {
readLen, err := conn.Read(buffer)
//fmt.Println("readLen: ", readLen, len(allbuf))
if err == io.EOF {
break
}
if err != nil {
fmt.Println("read error")
return
}
if len(allbuf) != 0 {
allbuf = append(allbuf, buffer...)
} else {
allbuf = buffer[:]
}
var readP int = 0
for {
//fmt.Println("allbuf content:", string(allbuf))
//buffer长度小于7
if readLen-readP < 7 {
allbuf = buffer[readP:]
break
}
msgLen, _ := strconv.Atoi(string(allbuf[readP+4 : readP+7]))
logLen := 7 + msgLen
//fmt.Println(readP, readP+logLen)
//buffer剩余长度>将处理的数据长度
if len(allbuf[readP:]) >= logLen {
//fmt.Println(string(allbuf[4:7]))
fmt.Println(string(allbuf[readP : readP+logLen]))
readP += logLen
//fmt.Println(readP, readLen)
if readP == readLen {
allbuf = nil
break
}
} else {
allbuf = buffer[readP:]
break
}
}
}
}
func CheckError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
四、io包的ReadFull
对于第三部分的解决golang开发socket服务时粘包半包bug,有作者认为太复杂了,参见golang tcp拆包的正确姿势,他提出可以用ReadFull来简化。
关于io包基础知识,参考Golang io reader writer
关于ReadFull,可以参考达达的博客系列:
Go语言小贴士1 - io包
Go语言小贴士2 - 协议解析
Go语言小贴士3 - bufio包
原文不再转述,现在引用一下重点:
io.Reader
的定义如下:
type Reader interface {
Read(p []byte) (n int, err error)
}
其中文档的说明非常重要,文档中详细描述了Read
方法的各种返回可能性。
文档描述中有一个要点,就是n
可能小于等于len(p)
,也就是说Go在读IO的时候,是不会保证一次读取预期的所有数据的。如果我们要确保一次读取我们所需的所有数据,就需要在一个循环里调用Read
,累加每次返回的n
并小心设置下次Read
时p
的偏移量,直到n
的累加值达到我们的预期。
因为上述需求实在太常见了,所以Go在io包中提供了一个ReadFull
函数来做到一次读取要求的所有数据,通过阅读ReadFull
函数的代码,也可以反过来帮助大家理解io.Reader
是怎么运作的。
//io.go源码
func ReadFull(r Reader, buf []byte) (n int, err error) {
return ReadAtLeast(r, buf, len(buf))
}
func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == EOF {
err = ErrUnexpectedEOF
}
return
}
在很多应用场景中,消息包的长度是不固定的,就像上面的字符串字段一样。我们一样可以用开头固定的几个字节来存放消息长度,在解析通讯协议的时候就可以从字节流中截出一个个的消息包了,这样的操作通常叫做协议分包或者粘包处理。
贴个从Socket读取消息包的伪代码(没编译):
func ReadPacket(conn net.Conn) ([]byte, error) {
var head [2]byte
if _, err := io.ReadFull(conn, head[:]); err != nil {
return err
}
size := binary.BigEndian.Uint16(head)
packet := make([]byte, size)
if _, err := io.ReadFull(conn, packet); err != nil {
return err
}
return packet
}
上面的代码就用到了前一个小贴士中说到的io.ReadFull
来确保一次读取完整数据。
要注意,这段代码不是线程安全的,如果有两个线程同时对一个net.Conn
进行ReadPacket
操作,很可能会发生严重错误,具体逻辑请自行分析。
从上面结构体序列化和反序列化的代码中,大家不难看出,实现一个二进制协议是挺繁琐和容易出BUG的,只要稍微有一个数值计算错就解析出错了。所以在工程实践中,不推荐大家手写二进制协议的解析代码,项目中通常会用自动化的工具来辅助生成代码。
在Leaf 游戏服务器框架简介的tcp_msg.go中,Read方法也使用了ReadFull这种方式来处理。
五、WebSocket
参考封装golang websocket
websocket是个二进制协议,需要先通过Http协议进行握手,从而协商完成从Http协议向websocket协议的转换。一旦握手结束,当前的TCP连接后续将采用二进制websocket协议进行双向双工交互,自此与Http协议无关。
可以通过这篇知乎了解一下websocket协议的基本原理:《WebSocket 是什么原理?为什么可以实现持久连接?》。
1.粘包
我们开发过TCP服务的都知道,需要通过协议decode从TCP字节流中解析出一个一个请求,那么websocket又怎么样呢?
websocket以message为单位进行通讯,本身就是一个在TCP层上的一个分包协议,其实并不需要我们再进行粘包处理。但是因为单个message可能很大很大(比如一个视频文件),那么websocket显然不适合把一个视频作为一个message传输(中途断了前功尽弃),所以websocket协议其实是支持1个message分多个frame帧传输的。
我们的浏览器提供的编程API都是message粒度的,把frame拆帧的细节对开发者隐蔽了,而服务端websocket框架一般也做了同样的隐藏,会自动帮我们收集所有的frame后拼成messasge再回调,所以结论就是:
websocket以message为单位通讯,不需要开发者自己处理粘包问题。
更多参考Websocket需要像TCP Socket那样进行逻辑数据包的分包与合包吗?
2.golang实现
golang官方标准库里有一个websocket的包,但是它提供的就是frame粒度的API,压根不能用。
不过官方其实已经认可了一个准标准库实现,它实现了message粒度的API,让开发者不需要关心websocket协议细节,开发起来非常方便,其文档地址:https://godoc.org/github.com/gorilla/websocket。
开发websocket服务时,首先要基于http库对外暴露接口,然后由websocket库接管TCP连接进行协议升级,然后进行websocket协议的数据交换,所以开发时总是要用到http库和websocket库。
上述websocket文档中对开发websocket服务有明确的注意事项要求,主要是指:
- 读和写API不是并发安全的,需要启动单个goroutine串行处理。
- 关闭API是线程安全的,一旦调用则阻塞的读和写API会出错返回,从而终止处理。
六、心跳实现
Golang 心跳的实现
在多客户端同时访问服务器的工作模式下,首先要保证服务器的运行正常。因此,Server和Client建立通讯后,确保连接的及时断开就非常重要。否则,多个客户端长时间占用着连接不关闭,是非常可怕的服务器资源浪费。会使得服务器可服务的客户端数量大幅度减少。因此,针对短链接和长连接,根据业务的需求,配套不同的处理机制。
- 短连接:一般建立完连接,就立刻传输数据。传输完数据,连接就关闭。服务端根据需要,设定连接的时长。超过时间长度,就算客户端超时。立刻关闭连接。
- 长连接:建立连接后,传输数据,然后要保持连接,然后再次传输数据。直到连接关闭。
socket读写可以通过 SetDeadline、SetReadDeadline、SetWriteDeadline设置阻塞的时间。
func (*IPConn) SetDeadline
func (c *IPConn) SetDeadline(t time.Time) error
func (*IPConn) SetReadDeadline
func (c *IPConn) SetReadDeadline(t time.Time) error
func (*IPConn) SetWriteDeadline
func (c *IPConn) SetWriteDeadline(t time.Time) error
如果做短连接,直接在Server端的连接上设置SetReadDeadline。当你设置的时限到达,无论客户端是否还在继续传递消息,服务端都不会再接收。并且已经关闭连接。
func main() {
server := ":7373"
netListen, err := net.Listen("tcp", server)
if err != nil{
Log("connect error: ", err)
os.Exit(1)
}
Log("Waiting for Client ...")
for{
conn, err := netListen.Accept()
if err != nil{
Log(conn.RemoteAddr().String(), "Fatal error: ", err)
continue
}
//设置短连接(10秒)
conn.SetReadDeadline(time.Now().Add(time.Duration(10)*time.Second))
Log(conn.RemoteAddr().String(), "connect success!")
...
}
}
这就可以了。在这段代码中,每当10秒中的时限一道,连接就终止了。
根据业务需要,客户端可能需要长时间保持连接。但是服务端不能无限制的保持。这就需要一个机制,如果超过某个时间长度,服务端没有获得客户端的数据,就判定客户端已经不需要连接了(比如客户端挂掉了)。做到这个,需要一个心跳机制。在限定的时间内,客户端给服务端发送一个指定的消息,以便服务端知道客户端还活着。
func sender(conn *net.TCPConn) {
for i := 0; i < 10; i++{
words := strconv.Itoa(i)+" Hello I'm MyHeartbeat Client."
msg, err := conn.Write([]byte(words))
if err != nil {
Log(conn.RemoteAddr().String(), "Fatal error: ", err)
os.Exit(1)
}
Log("服务端接收了", msg)
time.Sleep(2 * time.Second)
}
for i := 0; i < 2 ; i++ {
time.Sleep(12 * time.Second)
}
for i := 0; i < 10; i++{
words := strconv.Itoa(i)+" Hi I'm MyHeartbeat Client."
msg, err := conn.Write([]byte(words))
if err != nil {
Log(conn.RemoteAddr().String(), "Fatal error: ", err)
os.Exit(1)
}
Log("服务端接收了", msg)
time.Sleep(2 * time.Second)
}
}
这段客户端代码,实现了两个相同的信息发送频率给服务端。两个频率中间,我们让运行休息了12秒。然后,我们在服务端的对应机制是这样的。
func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
select {
case fk := <- bytes:
Log(conn.RemoteAddr().String(), "心跳:第", string(fk), "times")
conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
break
case <- time.After(5 * time.Second):
Log("conn dead now")
conn.Close()
}
}
每次接收到心跳数据就 SetDeadline 延长一个时间段 timeout。如果没有接到心跳数据,5秒后连接关闭。
网友评论