以下代码片段取自:https://github.com/juicedata/juicefs
SleepWithJitter
Jitter抖动意思
func SleepWithJitter(d time.Duration) {
j := int64(d / 20) // +- 5%
time.Sleep(d + time.Duration(rand.Int63n(2*j+1)-j))
}
内存大小格式化(可读性强)
func FormatBytes(n uint64) string {
if n < 1024 {
return fmt.Sprintf("%d Bytes", n)
}
units := []string{"K", "M", "G", "T", "P", "E"}
m := n
i := 0
for ; i < len(units)-1 && m >= 1<<20; i++ {
m = m >> 10
}
return fmt.Sprintf("%.2f %siB (%d Bytes)", float64(m)/1024.0, units[i], n)
}
func main() {
fmt.Println(tool.FormatBytes(88))
fmt.Println(tool.FormatBytes(8888))
fmt.Println(tool.FormatBytes(8888888))
fmt.Println(tool.FormatBytes(88888888888))
fmt.Println(tool.FormatBytes(8888888888888))
}
输出:
image.png
memUseage
func MemoryUsage() (virt, rss uint64) {
stat, err := ioutil.ReadFile("/proc/self/stat")
if err == nil {
stats := bytes.Split(stat, []byte(" "))
if len(stats) >= 24 {
v, _ := strconv.ParseUint(string(stats[22]), 10, 64)
r, _ := strconv.ParseUint(string(stats[23]), 10, 64)
return v, r * 4096
}
}
var ru syscall.Rusage
err = syscall.Getrusage(syscall.RUSAGE_SELF, &ru)
if err == nil {
return uint64(ru.Maxrss), uint64(ru.Maxrss)
}
return
}
代码超时封装
func WithTimeout(f func() error, timeout time.Duration) error {
var t = time.NewTimer(timeout)
var err error
var done = make(chan struct{}, 1)
go func() {
err = f()
done <- struct{}{}
}()
select {
case <-done:
t.Stop()
case <-t.C:
err = fmt.Errorf("timeout after %s", timeout)
}
return err
}
cond
相比较于 sync.cond 提供了可以超时的wait
// Cond is similar to sync.Cond, but you can wait with a timeout.
type Cond struct {
L sync.Locker
signal chan struct{}
}
// Signal wakes up a waiter.
// It's required for the caller to hold L.
func (c *Cond) Signal() {
select {
case c.signal <- struct{}{}:
default:
}
}
// Broadcast wake up all the waiters.
// It's required for the caller to hold L.
func (c *Cond) Broadcast() {
close(c.signal)
c.signal = make(chan struct{})
}
// Wait until Signal() or Broadcast() is called.
func (c *Cond) Wait() {
ch := c.signal
c.L.Unlock()
<-ch
c.L.Lock()
}
var timerPool = sync.Pool{
New: func() interface{} {
return time.NewTimer(time.Second)
},
}
// WaitWithTimeout wait for a signal or a period of timeout eclipsed.
// returns true in case of timeout else false
func (c *Cond) WaitWithTimeout(d time.Duration) bool {
ch := c.signal
c.L.Unlock()
t := timerPool.Get().(*time.Timer)
t.Reset(d)
defer func() {
t.Stop()
timerPool.Put(t)
c.L.Lock()
}()
select {
case <-ch:
return false
case <-t.C:
return true
}
}
// NewCond creates a Cond.
func NewCond(lock sync.Locker) *Cond {
return &Cond{lock, make(chan struct{})}
}
内存池
var used int64
func AllocMemory() int64 {
return atomic.LoadInt64(&used)
}
// Alloc returns size bytes memory from Go heap.
func Alloc(size int) []byte {
zeros := powerOf2(size)
b := *pools[zeros].Get().(*[]byte)
if cap(b) < size {
panic(fmt.Sprintf("%d < %d", cap(b), size))
}
atomic.AddInt64(&used, int64(cap(b)))
return b[:size]
}
// Free returns memory to Go heap.
func Free(b []byte) {
atomic.AddInt64(&used, -int64(cap(b)))
pools[powerOf2(cap(b))].Put(&b)
}
var pools []*sync.Pool
// powerOf2 输入2, 返回1, 输入5, 返回3
func powerOf2(s int) int {
var bits int
var p = 1
for p < s {
bits++
p *= 2
}
return bits
}
func init() {
pools = make([]*sync.Pool, 30) // 1 - 1G (1, 2, 4, 8, 16, ... 2^30 = 1G)
for i := 0; i < 30; i++ {
// 下面这里要用闭包, 如果省略掉闭包的写法, 那么每个长度都是2^30
//pools[i] = &sync.Pool{
// New: func() interface{} {
// b := make([]byte, 1<<i)
// return &b
// },
//}
func(bits int) {
pools[i] = &sync.Pool{
New: func() interface{} {
b := make([]byte, 1<<bits)
return &b
},
}
}(i)
}
go func() {
for {
time.Sleep(time.Minute * 10)
runtime.GC()
}
}()
}
image.png
page
page 对上面内存池中申请的内存进行引用计数的管理
import (
"errors"
"io"
"os"
"runtime"
"runtime/debug"
"sync/atomic"
)
var pageStack = os.Getenv("JFS_PAGE_STACK") != ""
// Page is a page with refcount
type Page struct {
refs int32
offHeap bool
dep *Page
Data []byte
stack []byte
}
func NewOffPage(size int) *Page {
if size <= 0 {
panic("size of page should > 0")
}
p := Alloc(size)
page := &Page{
refs: 1,
offHeap: true,
Data: p,
}
if pageStack {
page.stack = debug.Stack()
}
runtime.SetFinalizer(page, func(p *Page) {
refCnt := atomic.LoadInt32(&p.refs)
if refCnt != 0 {
if refCnt > 0 {
p.release()
}
}
})
return page
}
func (p *Page) Slice(off, len int) *Page {
p.acquire()
np := &Page{
refs: 1,
Data: p.Data[off : off+len],
dep: p,
}
return np
}
// acquire increase the refcount
func (p *Page) acquire() {
if pageStack {
p.stack = append(p.stack, debug.Stack()...)
}
atomic.AddInt32(&p.refs, 1)
}
// release decrease the refCount
func (p *Page) release() {
if pageStack {
p.stack = append(p.stack, debug.Stack()...)
}
if atomic.AddInt32(&p.refs, -1) == 0 {
if p.offHeap {
Free(p.Data)
}
if p.dep != nil {
p.dep.release()
p.dep = nil
}
p.Data = nil
}
}
type PageReader struct {
p *Page
off int
}
func NewPageReader(p *Page) *PageReader {
p.acquire()
return &PageReader{p, 0}
}
func (r *PageReader) Read(buf []byte) (int, error) {
n, err := r.ReadAt(buf, int64(r.off))
r.off += n
return n, err
}
func (r *PageReader) ReadAt(buf []byte, off int64) (int, error) {
if len(buf) == 0 {
return 0, nil
}
if r.p == nil {
return 0, errors.New("page is already released")
}
if int(off) == len(r.p.Data) {
return 0, io.EOF
}
n := copy(buf, r.p.Data[off:])
if n < len(buf) {
return n, io.EOF
}
return n, nil
}
func (r *PageReader) Close() error {
if r.p != nil {
r.p.release()
r.p = nil
}
return nil
}
网友评论