sync包
sync.Mutex 互斥锁
适用于并发访问公共资源的场景。
声明互斥锁
var lock sync.Mutex
使用互斥锁
func add() {
for i := 0; i < 500000; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
sync.RWMutex 读写互斥锁
适用于读远远大于写的场景。
声明读写互斥锁
var rwLock sync.RWMutex
使用读写互斥锁
// 读操作
func read() {
defer wg.Done()
rwLock.RLock()
fmt.Println(x)
time.Sleep(time.Millisecond * 2)
rwLock.RUnlock()
}
// 写操作
func write() {
defer wg.Done()
rwLock.Lock()
x = x + 1
time.Sleep(time.Millisecond * 10)
rwLock.Unlock()
}
sync.Once
适用于那些只执行一次的场景。
例如,只加载一次图片、只关闭一次channel
sync.Map
Go内置的map不是并发安全的。
// sync.Map 是一个开箱即用的并发安全的map
var m2 = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 21; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m2.Store(key, n) // 必须使用sync.Map内置的Store方法设置键值对
value, _ := m2.Load(key) // 必须使用sync.Map提供的Load方法根据key取值
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
actomic原子操作
什么是原子操作
原子操作的意思是说,这个操作在执行的过程中,其它协程不会看到执行一半的操作结果。在其它协程看来,原子操作要么执行完成了,要么还没开始,就像一个原子一样,不可分割。
在单处理器单核系统中,即使一个操作翻译成汇编不止一个指令,也有可能保持一致性。比如经常用来演示的并发场景下的 count++ 操作 (count++ 对应的汇编指令就有三条),如果像下面这样写:
func main() {
runtime.GOMAXPROCS(1)
var w sync.WaitGroup
count := int32(0)
w.Add(100)
for i := 0; i < 100; i++ {
go func() {
for j := 0; j < 20; j++ {
count++
}
w.Done()
}()
}
w.Wait()
fmt.Println(count)
}
无论执行多少次,输出结果都是 2000。
而在多核系统中,情况就变得复杂了许多。A核修改 count 的时候,由于 CPU 缓存的存在,B核读到的 count 值可能不是最新的值。如果我们将上面的例子中的第二行改成:
runtime.GOMAXPROCS(2)
之后,程序每执行一次,结果都有可能不一样。
解决思路除了使用前面介绍过的 Mutex,也可以使用今天要介绍的 atomic,具体使用方法是将 count++ 替换成:
atomic.AddInt32(&count, 1)
这样就能保证即使在多核系统下 count++ 也是一个原子操作。
针对一些基本的原子操作,不同的 CPU 架构中有不同的机制来保证原子性,atomic 包将底层不同架构的实现进行了封装,对外提供通用的 API。
atomic 的基础方法
原子操作主要是两类:修改和加载存储。修改很好理解,就是在原来值的基础上改动;加载存储就是读写。
atomic 提供了 AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等方法。
由于 Go 暂时还不支持泛型,所以很多方法的实现都很啰嗦,比如 AddXXX 方法,针对 int32、int64、uint32 基础类型,每个类型都有相应的实现。等 Go 支持泛型之后,相信 atomic 的 API 就会清爽很多。
需要注意的是,atomic 的操作对象是地址,所以传参的时候,需要传变量的地址,不能传变量的值。
Go 并发之原子操作 atomic
互联网协议
TCP
TCP/IP(Transmission Control Protocol/Internet Protocol) 即传输控制协议/网间协议,是一种面向连接(连接导向)的、可靠的、基于字节流的传输层(Transport layer)通信协议,因为是面向连接的协议,数据像水流一样传输,会存在黏包问题。
服务端
// tcp/server/main.go
// TCP server端
// 处理函数
func process(conn net.Conn) {
defer conn.Close() // 关闭连接
for {
reader := bufio.NewReader(conn)
var buf [128]byte
n, err := reader.Read(buf[:]) // 读取数据
if err != nil {
fmt.Println("read from client failed, err:", err)
break
}
recvStr := string(buf[:n])
fmt.Println("收到client端发来的数据:", recvStr)
conn.Write([]byte(recvStr)) // 发送数据
}
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:20000")
if err != nil {
fmt.Println("listen failed, err:", err)
return
}
for {
conn, err := listen.Accept() // 建立连接
if err != nil {
fmt.Println("accept failed, err:", err)
continue
}
go process(conn) // 启动一个goroutine处理连接
}
}
TCP客户端
// tcp/client/main.go
// 客户端
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:20000")
if err != nil {
fmt.Println("err :", err)
return
}
defer conn.Close() // 关闭连接
inputReader := bufio.NewReader(os.Stdin)
for {
input, _ := inputReader.ReadString('\n') // 读取用户输入
inputInfo := strings.Trim(input, "\r\n")
if strings.ToUpper(inputInfo) == "Q" { // 如果输入q就退出
return
}
_, err = conn.Write([]byte(inputInfo)) // 发送数据
if err != nil {
return
}
buf := [512]byte{}
n, err := conn.Read(buf[:])
if err != nil {
fmt.Println("recv failed, err:", err)
return
}
fmt.Println(string(buf[:n]))
}
}
开发需注意TCP黏包问题
为什么会出现粘包
主要原因就是tcp数据传递模式是流模式,在保持长连接的时候可以进行多次的收和发。
“粘包”可发生在发送端也可发生在接收端:
由Nagle算法造成的发送端的粘包:Nagle算法是一种改善网络传输效率的算法。简单来说就是当我们提交一段数据给TCP发送时,TCP并不立刻发送此段数据,而是等待一小段时间看看在等待期间是否还有要发送的数据,若有则会一次把这两段数据发送出去。
接收端接收不及时造成的接收端粘包:TCP会把接收到的数据存在自己的缓冲区中,然后通知应用层取数据。当应用层由于某些原因不能及时的把TCP的数据取出来,就会造成TCP缓冲区中存放了几段数据。
解决办法
出现”粘包”的关键在于接收方不确定将要传输的数据包的大小,因此我们可以对数据包进行封包和拆包的操作。
封包:封包就是给一段数据加上包头,这样一来数据包就分为包头和包体两部分内容了(过滤非法包时封包会加入”包尾”内容)。包头部分的长度是固定的,并且它存储了包体的长度,根据包头长度固定以及包头中含有包体长度的变量就能正确的拆分出一个完整的数据包。
UDP
UDP协议(User Datagram Protocol)中文名称是用户数据报协议,是OSI(Open System Interconnection,开放式系统互联)参考模型中一种无连接的传输层协议,不需要建立连接就能直接进行数据发送和接收,属于不可靠的、没有时序的通信,但是UDP协议的实时性比较好,通常用于视频直播相关领域。
UDP服务端
使用Go语言的net包实现的UDP服务端代码如下:
func main() {
listen, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: 30000,
})
if err != nil {
fmt.Println("listen failed, err:", err)
return
}
defer listen.Close()
for {
var data [1024]byte
n, addr, err := listen.ReadFromUDP(data[:]) // 接收数据
if err != nil {
fmt.Println("read udp failed, err:", err)
continue
}
fmt.Printf("data:%v addr:%v count:%v\n", string(data[:n]), addr, n)
_, err = listen.WriteToUDP(data[:n], addr) // 发送数据
if err != nil {
fmt.Println("write to udp failed, err:", err)
continue
}
}
}
UDP客户端
使用Go语言的net包实现的UDP客户端代码如下:
func main() {
socket, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: 30000,
})
if err != nil {
fmt.Println("连接服务端失败,err:", err)
return
}
defer socket.Close()
sendData := []byte("Hello server")
_, err = socket.Write(sendData) // 发送数据
if err != nil {
fmt.Println("发送数据失败,err:", err)
return
}
data := make([]byte, 4096)
n, remoteAddr, err := socket.ReadFromUDP(data) // 接收数据
if err != nil {
fmt.Println("接收数据失败,err:", err)
return
}
fmt.Printf("recv:%v addr:%v count:%v\n", string(data[:n]), remoteAddr, n)
}
网友评论