美文网首页
juiceFS 代码小工具学习

juiceFS 代码小工具学习

作者: wayyyy | 来源:发表于2023-11-03 17:53 被阅读0次

    以下代码片段取自:https://github.com/juicedata/juicefs

    SleepWithJitter

    Jitter抖动意思

    func SleepWithJitter(d time.Duration) {
        j := int64(d / 20) // +- 5%
        time.Sleep(d + time.Duration(rand.Int63n(2*j+1)-j))
    }
    
    内存大小格式化(可读性强)
    func FormatBytes(n uint64) string {
        if n < 1024 {
            return fmt.Sprintf("%d Bytes", n)
        }
        units := []string{"K", "M", "G", "T", "P", "E"}
        m := n
        i := 0
        for ; i < len(units)-1 && m >= 1<<20; i++ {
            m = m >> 10
        }
        return fmt.Sprintf("%.2f %siB (%d Bytes)", float64(m)/1024.0, units[i], n)
    }
    
    func main() {
        fmt.Println(tool.FormatBytes(88))
        fmt.Println(tool.FormatBytes(8888))
        fmt.Println(tool.FormatBytes(8888888))
        fmt.Println(tool.FormatBytes(88888888888))
        fmt.Println(tool.FormatBytes(8888888888888))
    }
    

    输出:


    image.png
    memUseage
    func MemoryUsage() (virt, rss uint64) {
        stat, err := ioutil.ReadFile("/proc/self/stat")
        if err == nil {
            stats := bytes.Split(stat, []byte(" "))
            if len(stats) >= 24 {
                v, _ := strconv.ParseUint(string(stats[22]), 10, 64)
                r, _ := strconv.ParseUint(string(stats[23]), 10, 64)
                return v, r * 4096
            }
        }
    
        var ru syscall.Rusage
        err = syscall.Getrusage(syscall.RUSAGE_SELF, &ru)
        if err == nil {
            return uint64(ru.Maxrss), uint64(ru.Maxrss)
        }
        return
    }
    
    代码超时封装
    func WithTimeout(f func() error, timeout time.Duration) error {
        var t = time.NewTimer(timeout)
        var err error
    
        var done = make(chan struct{}, 1)
        go func() {
            err = f()
            done <- struct{}{}
        }()
    
        select {
        case <-done:
            t.Stop()
        case <-t.C:
            err = fmt.Errorf("timeout after %s", timeout)
        }
    
        return err
    }
    
    cond

    相比较于 sync.cond 提供了可以超时的wait

    // Cond is similar to sync.Cond, but you can wait with a timeout.
    type Cond struct {
        L      sync.Locker
        signal chan struct{}
    }
    
    // Signal wakes up a waiter.
    // It's required for the caller to hold L.
    func (c *Cond) Signal() {
        select {
        case c.signal <- struct{}{}:
        default:
        }
    }
    
    // Broadcast wake up all the waiters.
    // It's required for the caller to hold L.
    func (c *Cond) Broadcast() {
        close(c.signal)
        c.signal = make(chan struct{})
    }
    
    // Wait until Signal() or Broadcast() is called.
    func (c *Cond) Wait() {
        ch := c.signal
        c.L.Unlock()
        <-ch
        c.L.Lock()
    }
    
    var timerPool = sync.Pool{
        New: func() interface{} {
            return time.NewTimer(time.Second)
        },
    }
    
    // WaitWithTimeout wait for a signal or a period of timeout eclipsed.
    // returns true in case of timeout else false
    func (c *Cond) WaitWithTimeout(d time.Duration) bool {
        ch := c.signal
        c.L.Unlock()
        t := timerPool.Get().(*time.Timer)
        t.Reset(d)
        defer func() {
            t.Stop()
            timerPool.Put(t)
            c.L.Lock()
        }()
        select {
        case <-ch:
            return false
        case <-t.C:
            return true
        }
    }
    
    // NewCond creates a Cond.
    func NewCond(lock sync.Locker) *Cond {
        return &Cond{lock, make(chan struct{})}
    }
    
    内存池
    var used int64
    
    func AllocMemory() int64 {
        return atomic.LoadInt64(&used)
    }
    
    // Alloc returns size bytes memory from Go heap.
    func Alloc(size int) []byte {
        zeros := powerOf2(size)
        b := *pools[zeros].Get().(*[]byte)
        if cap(b) < size {
            panic(fmt.Sprintf("%d < %d", cap(b), size))
        }
        atomic.AddInt64(&used, int64(cap(b)))
        return b[:size]
    }
    
    // Free returns memory to Go heap.
    func Free(b []byte) {
        atomic.AddInt64(&used, -int64(cap(b)))
        pools[powerOf2(cap(b))].Put(&b)
    }
    
    var pools []*sync.Pool
    
    // powerOf2 输入2, 返回1, 输入5, 返回3
    func powerOf2(s int) int {
        var bits int
        var p = 1
        for p < s {
            bits++
            p *= 2
        }
        return bits
    }
    
    func init() {
        pools = make([]*sync.Pool, 30) // 1 - 1G (1, 2, 4, 8, 16, ... 2^30 = 1G)
        for i := 0; i < 30; i++ {
            // 下面这里要用闭包, 如果省略掉闭包的写法, 那么每个长度都是2^30
            //pools[i] = &sync.Pool{
            //  New: func() interface{} {
            //      b := make([]byte, 1<<i)
            //      return &b
            //  },
            //}
            func(bits int) {
                pools[i] = &sync.Pool{
                    New: func() interface{} {
                        b := make([]byte, 1<<bits)
                        return &b
                    },
                }
            }(i)
        }
    
        go func() {
            for {
                time.Sleep(time.Minute * 10)
                runtime.GC()
            }
        }()
    }
    
    image.png
    page

    page 对上面内存池中申请的内存进行引用计数的管理

    import (
        "errors"
        "io"
        "os"
        "runtime"
        "runtime/debug"
        "sync/atomic"
    )
    
    var pageStack = os.Getenv("JFS_PAGE_STACK") != ""
    
    // Page is a page with refcount
    type Page struct {
        refs    int32
        offHeap bool
        dep     *Page
        Data    []byte
        stack   []byte
    }
    
    func NewOffPage(size int) *Page {
        if size <= 0 {
            panic("size of page should > 0")
        }
        p := Alloc(size)
        page := &Page{
            refs:    1,
            offHeap: true,
            Data:    p,
        }
    
        if pageStack {
            page.stack = debug.Stack()
        }
    
        runtime.SetFinalizer(page, func(p *Page) {
            refCnt := atomic.LoadInt32(&p.refs)
            if refCnt != 0 {
                if refCnt > 0 {
                    p.release()
                }
            }
        })
    
        return page
    }
    
    func (p *Page) Slice(off, len int) *Page {
        p.acquire()
        np := &Page{
            refs: 1,
            Data: p.Data[off : off+len],
            dep:  p,
        }
        return np
    }
    
    // acquire increase the refcount
    func (p *Page) acquire() {
        if pageStack {
            p.stack = append(p.stack, debug.Stack()...)
        }
    
        atomic.AddInt32(&p.refs, 1)
    }
    
    // release decrease the refCount
    func (p *Page) release() {
        if pageStack {
            p.stack = append(p.stack, debug.Stack()...)
        }
    
        if atomic.AddInt32(&p.refs, -1) == 0 {
            if p.offHeap {
                Free(p.Data)
            }
            if p.dep != nil {
                p.dep.release()
                p.dep = nil
            }
            p.Data = nil
        }
    }
    
    type PageReader struct {
        p   *Page
        off int
    }
    
    func NewPageReader(p *Page) *PageReader {
        p.acquire()
        return &PageReader{p, 0}
    }
    
    func (r *PageReader) Read(buf []byte) (int, error) {
        n, err := r.ReadAt(buf, int64(r.off))
        r.off += n
        return n, err
    }
    
    func (r *PageReader) ReadAt(buf []byte, off int64) (int, error) {
        if len(buf) == 0 {
            return 0, nil
        }
        if r.p == nil {
            return 0, errors.New("page is already released")
        }
        if int(off) == len(r.p.Data) {
            return 0, io.EOF
        }
        n := copy(buf, r.p.Data[off:])
        if n < len(buf) {
            return n, io.EOF
        }
        return n, nil
    }
    
    func (r *PageReader) Close() error {
        if r.p != nil {
            r.p.release()
            r.p = nil
        }
    
        return nil
    }
    
    singleflight
    prefetch

    相关文章

      网友评论

          本文标题:juiceFS 代码小工具学习

          本文链接:https://www.haomeiwen.com/subject/bspoidtx.html