原文链接: https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html
上一篇文章《使用压缩文件优化io (一)》中记录了日志备份 io 优化方案,使用文件流数据压缩方案优化 io 性能,效果十分显著。这篇文章记录数据分析前置清洗、格式化数据的 io 优化方案,我们有一台专用的日志前置处理服务器,所有业务日志通过这台机器从 OSS 拉取回来清洗、格式化,最后进入到数据仓储中便于后续的分析。
随着业务扩展这台服务器压力越来越大,高峰时数据延迟越来越厉害,早期也是使用 Python 脚本 + awk 以及一些 shell 命令完成相关工作,在数据集不是很大的时候这种方案很好,效率也很高,随着数据集变大,发现服务器负载很高,经过分析是还是 io 阻塞,依旧采用对数据流进行处理的方案优化io,以下记录优化的过程。
背景介绍
服务器配置:4 核 8G; 磁盘:1T
分析前置服务会根据业务不同分为十分钟、一小时两个阶段拉取分析日志,每隔一个阶段会去 OSS 拉取日志回到服务器进行处理,处理过程因 io 阻塞,导致 CPU 和 load 异常高,且处理效率严重下降,这次优化主要就是降低 io 阻塞,提升 CPU 利用率 (处理业务逻辑而不是等待 io) 和处理效率。
后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为:
- 文件数量:432个
- 压缩文件:17G
- 解压后文件:63G
- 压缩方案:lzo
- Goroutine 数量:20
优化前
优化前日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志到本地磁盘 (压缩文件)
- 解压缩日志文件
- 读取日志数据
- 业务处理……
- 导入到数据仓储中
导致 io 阻塞的部分主要是: 拉取 OSS 日志、解压缩日志文件及读取日志数据,优化也主要从这三块着手。
这里有一段公共的日志读取方法,该方法接收一个 io.Reader
, 并按行读取日志,并简单切分日志字段,并没有实质的处理日志数据,后面的优化方案也将使用这个方法读取日志。
package main
import (
"bufio"
"bytes"
"io"
"github.com/thinkeridea/go-extend/exbytes"
)
func Read(r io.Reader) {
rawBuffer := make([]byte, 512)
buf := bufio.NewReader(r)
for {
line, ok, err := readLine(buf, rawBuffer)
if err == io.EOF {
return
}
if err != nil {
panic(nil)
}
if ok {
rawBuffer = line
}
c := bytes.Count(line, []byte{'\x01'})
if c != 65 {
panic("无效的行")
}
}
}
func readLine(r *bufio.Reader, rawBuffer []byte) ([]byte, bool, error) {
var ok bool
line, err := r.ReadSlice('\n')
if (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF {
rawBuffer = append(rawBuffer[:0], line...)
for (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF {
line, err = r.ReadSlice('\n')
rawBuffer = append(rawBuffer, line...)
}
line = rawBuffer
ok = true
}
if len(line) > 0 && err == io.EOF {
err = nil
}
return line, ok, err
}
日志按 \r\r\n
分隔行,使用 \x01
切分字段,读取方法使用 bufio.ReadSlice
方法,避免内存分配,且当 bufio
缓冲区满之后使用 rwaBuffer
作为本地可扩展缓冲,每次扩展之后会保留最大的扩展空间,因为业务日志每行大小差不多,这样可以极大的减少内存分配,效率是 bufio.ReadLine
方法的好几倍。
package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
".../pkg/aliyun_oss" // 虚假的包
)
func main() {
var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装
files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now()
defer func(t time.Time) {
fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds())
}(start)
// 下载日志文件
n := 20
var wg sync.WaitGroup
c := make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
f, ok := <-c
if !ok {
return
}
if _, err := os.Stat(f); err == nil {
return
} else if !os.IsNotExist(err) {
panic(err)
}
dir := filepath.Dir(f)
err := os.MkdirAll(dir, 0755)
if err != nil {
panic(err)
}
err = oss.GetObjectToFile(f, f)
if err != nil {
panic(err)
}
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
// 解压日志文件
start = time.Now()
shell := exec.Command("/bin/bash", "-c", "lzop -df logs/*/*/*/*/*/*.lzo")
err := shell.Run()
if err != nil {
panic(err)
}
fmt.Printf("解压文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
// 读取日志文件
start = time.Now()
c = make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
file, ok := <-c
if !ok {
return
}
f, err := os.Open(file)
if err != nil {
panic(err)
}
Read(f)
f.Close()
}
}()
}
for _, f := range files {
c <- strings.TrimRight(f, ".lzo")
}
close(c)
wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
}
运行程序输出如下:
待处理文件数量:432
下载文件耗时:303.562865
解压文件耗时:611.236232
读取文件耗时:460.371245
共耗时:1375.187261
通过 iostat -m -x 5 10000
分析各个阶段结果如下:
- 下载时:
avg-cpu: %user %nice %system %iowait %steal %idle
7.85 0.00 16.44 11.24 0.00 64.48
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 80.40 7.80 8.98 0.04 209.36 0.40 4.57 4.64 3.77 0.50 4.44
vdb 1.40 761.20 247.60 264.00 14.70 60.92 302.72 9.17 17.92 10.36 25.00 0.52 26.52
- 解压时:
avg-cpu: %user %nice %system %iowait %steal %idle
8.54 0.00 8.33 68.39 0.00 14.74
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 1.20 3.40 11.80 0.01 0.05 8.95 0.30 20.03 0.41 25.68 0.55 0.84
vdb 0.00 22037.80 107.80 243.20 26.45 107.01 778.71 83.52 236.68 74.31 308.65 2.52 88.54
- 读取时:
avg-cpu: %user %nice %system %iowait %steal %idle
2.74 0.00 5.07 92.19 0.00 0.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 2.40 3.80 23.60 0.01 0.14 11.85 0.12 4.48 1.95 4.89 0.33 0.90
vdb 1.80 4.60 347.20 6.20 139.97 0.08 811.60 126.62 358.04 360.79 203.48 2.83 100.00
通过 iostat
结果可以看出,在解压和读取日志时 io
阻塞比较严重,且运行时间较长,下载时 io
阻塞也存在,但还可以接受,通过下面两个方案逐渐消除掉 io
。
优化方案一
优化前的方案反应出在解压和读取日志时 io
阻塞比较严重,那么是否可以通过读取 lzo
压缩文件,以此来消除解压缩日志耗时太大、io
太高的问题呢?并且读取 lzo
压缩文件远比解压后文件小,来降低读取日志耗时太大、io
太高的问题呢?
优化后日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志到本地磁盘 (压缩文件)
- 读取压缩日志数据
- 业务处理……
- 导入到数据仓储中
package main
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
".../pkg/aliyun_oss" // 虚假的包
"github.com/cyberdelia/lzo"
)
func main() {
var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装
files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now()
defer func(t time.Time) {
fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds())
}(start)
// 下载日志文件
n := 20
var wg sync.WaitGroup
c := make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
f, ok := <-c
if !ok {
return
}
if _, err := os.Stat(f); err == nil {
return
} else if !os.IsNotExist(err) {
panic(err)
}
dir := filepath.Dir(f)
err := os.MkdirAll(dir, 0755)
if err != nil {
panic(err)
}
err = oss.GetObjectToFile(f, f)
if err != nil {
panic(err)
}
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
start = time.Now()
c = make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
file, ok := <-c
if !ok {
return
}
f, err := os.Open(file)
if err != nil {
panic(err)
}
r, err := lzo.NewReader(f)
if err != nil {
panic(err)
}
Read(r)
r.Close()
f.Close()
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
}
这个方案消除了解压缩日志,并且直接读取压缩日志,使用 github.com/cyberdelia/lzo
包对压缩文件数据流进行边读取边解压,这次不用单独封装新的方法了,直接使用 lzo
包中的接口即可。
程序运行结果如下:
待处理文件数量:432
下载文件耗时:286.146603
读取文件耗时:132.787345
共耗时:418.942862
这个方案效果非常明显,总耗时从 1375.187261
降低到 418.942862
提升了 3 倍的效率,不仅消除了压缩的时间,还大大缩短了读取文件耗时,成果显著。
通过 iostat -m -x 5 10000
分析各个阶段结果如下:
下载时:
avg-cpu: %user %nice %system %iowait %steal %idle
5.08 0.00 13.24 29.34 0.00 52.33
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 2.80 1.40 11.80 0.01 0.07 12.00 0.02 1.85 1.14 1.93 0.18 0.24
vdb 0.00 17207.60 0.60 212.40 0.00 75.06 721.74 55.81 236.34 84.33 236.77 2.49 53.14
读取时:
avg-cpu: %user %nice %system %iowait %steal %idle
80.66 0.00 4.83 14.50 0.00 0.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 0.00 6.20 0.20 0.06 0.00 20.00 0.01 1.69 1.71 1.00 0.62 0.40
vdb 0.00 6.80 390.40 19.20 118.78 0.23 595.04 74.87 190.55 197.95 40.08 1.85 75.90
通过 iostat
结果分析,下载时 io
阻塞和优化前波动不是很大,读取时的 io
优化已经非常好了,iowait
从 92.19%
降低到 14.5%
,CPU 更多的任务用来处理解压缩日志,而不是处理 io
阻塞。
优化方案二
本来优化到上面的效果已经非常满意了,不过既然开始做优化就不能草草结束了,仔细思考业务场景,需要 本地 lzo
文件?重新处理日志的频率高吗?本地 lzo
日志清理方便吗?
通过上面的几个问题发现,除非程序出现问题或者数据存储出现故障,否者极少需要重新处理日志,一年里面这种情况也是极少的,甚至不会发生。
那么思考一下,不下载日志,直接读取网络数据流,实现边下边解压边读取,这样岂不是没有 io
了吗?
优化后日志处理流程:
- 获取待处理文件列表
- 拉取 OSS 日志,在内存中解压并读取分析日志
- 业务处理……
- 导入到数据仓储中
具体实现如下:
package main
import (
"fmt"
"sync"
"time"
".../pkg/aliyun_oss" // 虚假的包
"github.com/cyberdelia/lzo"
)
func main() {
var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装
files := list() // 这是一个虚构的方法,用来获取待处理的文件列表
fmt.Printf("待处理文件数量:%d\n", len(files))
start := time.Now()
defer func(t time.Time) {
fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds())
}(start)
n := 20
var wg sync.WaitGroup
c := make(chan string)
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for {
f, ok := <-c
if !ok {
return
}
r1, err := oss.GetObject(f)
if err != nil {
panic(err)
}
r, err := lzo.NewReader(r1)
if err != nil {
panic(err)
}
Read(r)
r.Close()
r1.Close()
}
}()
}
for _, f := range files {
c <- f
}
close(c)
wg.Wait()
fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds())
}
优化后只有一个流程了,代码简洁了不少,看看效率如何吧!
程序运行结果如下:
待处理文件数量:432
读取文件耗时:285.993661
共耗时:285.993717
天啊发生了什么,我使劲擦了擦眼睛,太不可思议了,居然只消耗了下载日志的耗时,较上一个方案总耗时从 418.942862
降低到 285.993717
,提升了近 2 倍的效率,让我们看看上个方案下载文件耗时 286.146603
,而新方案总耗时是 285.993717
居然只用了上个优化版本的下载时间,究竟发生了什么?
通过 iostat -m -x 5 10000
分析结果如下:
avg-cpu: %user %nice %system %iowait %steal %idle
43.73 0.00 9.64 0.31 0.00 46.32
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
vda 0.00 1.20 4.40 3.80 0.02 0.03 10.93 0.01 1.49 0.59 2.53 0.71 0.58
vdb 0.00 6.80 0.00 24.60 0.00 0.27 22.83 0.02 0.84 0.00 0.84 0.28 0.68
通过 iostat
结果分析,在程序运行期间没有任何 io
开销,CPU 居然还有一半的空闲,前面两个版本 CPU 是没有空闲的啊,由此看来之前 CPU 更多的消耗在 io
阻塞上了,并没有用来处理业务逻辑。
由此来看也就不足为奇了,为啥优化后只需要下载日志的时间就能处理完所有日志了,没有了 io
阻塞,CPU 更多了用来处理业务,把之前下载时写文件 io
的耗时,用来解压缩数据,读取数据,且还有更多的空闲,跑出这样的结果也就很正常了。
总结
从优化前耗时 1375.187261
秒到 285.993717
秒,性能提升 80%, 从 iowait
92.19%
到 0.31%
提升近 100%
,从没有任何 CPU 空闲到有一半空闲,这个过程中有很多值得总结的事情。
io
对性能的影响非常大,对 CPU 占用非常严重,导致 CPU 处理业务逻辑的时间片非常少。从 io
转移到 CPU 对性能提升非常明显。CPU 计算效率十分的高,从 io
密集到密集计算,只要符合业务场景,往往能给我们带来意想不到的效果。
往往优化业务并不需要十分高大上的技术,只是转变一下思路,不仅代码更少,且程序更简短、好维护、逻辑更清晰。
一定要结合实际业务场景进行思考,减少理所当然和业务无关的中间环节,往往就可以极大的提升程序效率。
转载:
本文作者: 戚银(thinkeridea)
本文链接: https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY 4.0 CN协议 许可协议。转载请注明出处!
网友评论