美文网首页
time ticker 实现过程

time ticker 实现过程

作者: 哆啦在这A梦在哪 | 来源:发表于2022-05-19 10:51 被阅读0次

    参考资料以及前提了解:
    https://zhuanlan.zhihu.com/p/337182213

    1. 堆,小根堆,堆的基本操作

    2.golang基本信号使用

    让我们先来看一下如何使用Ticker

    //创建Ticker,设置多长时间触发一次
    ticker := time.NewTicker(time.Second * 10)
    go func() {
       for range ticker.C { //遍历ticker.C,如果有值,则会执行do someting,否则阻塞
          //do someting
       }
    }()
    

    调用NewTicker可以生成Ticker,关于这个函数有四点需要说明

    1.NewTicker中的时间是以纳秒为单位的
    2.when返回的从当前时间+d的纳秒值
    3.d必须为正值
    4.Ticker结构体中包含channel,sendTime是个function,逻辑为用select等待c被赋值

    startTimer函数

    揭示channel、sendTime是如何关联的

    // NewTicker returns a new Ticker containing a channel that will send the
    // time with a period specified by the duration argument.
    // It adjusts the intervals or drops ticks to make up for slow receivers.
    // The duration d must be greater than zero; if not, NewTicker will panic.
    // Stop the ticker to release associated resources.
    func NewTicker(d Duration) *Ticker {
       if d <= 0 {
          panic(errors.New("non-positive interval for NewTicker"))
       }
       // Give the channel a 1-element time buffer.
       // If the client falls behind while reading, we drop ticks
       // on the floor until the client catches up.
       c := make(chan Time, 1)
       t := &Ticker{
          C: c,
          r: runtimeTimer{
             when:   when(d),
             period: int64(d),
             f:      sendTime,
             arg:    c,
          },
       }
       startTimer(&t.r)
       return t
    }
    

    time/tick.go的Ticker数据结构

    // A Ticker holds a channel that delivers `ticks' of a clock
    // at intervals.
    type Ticker struct {
       C <-chan Time // The channel on which the ticks are delivered.
       r runtimeTimer
    }
    

    time/sleep.go的runtimeTimer

    // Interface to timers implemented in package runtime.
    // Must be in sync with ../runtime/time.go:/^type timer
    type runtimeTimer struct {
       tb uintptr
       i  int
    
       when   int64
       period int64
       f      func(interface{}, uintptr) // NOTE: must not be closure
       arg    interface{}
       seq    uintptr
    }
    

    time/sleep.go的sendTime

    func sendTime(c interface{}, seq uintptr) {
       // Non-blocking send of time on c.
       // Used in NewTimer, it cannot block anyway (buffer).
       // Used in NewTicker, dropping sends on the floor is
       // the desired behavior when the reader gets behind,
       // because the sends are periodic.
       select {
       case c.(chan Time) <- Now():
       default:
       }
    }
    

    time/sleep.go的startTimer

    func startTimer(*runtimeTimer)
    func stopTimer(*runtimeTimer) bool
    startTimer
    

    看完上面的代码,大家内心是不是能够猜出是怎么实现的?

    有一个机制保证时间到了时,sendTime被调用,此时channel会被赋值,调用ticker.C的位置解除阻塞,执行指定的逻辑。

    让我们看一下GoLang是不是这样实现的。

    追踪代码的时候我们发现在time包里的startTimer,只是一个声明,那真正的实现在哪里?

    runtime/time.go的startTimer

    此处使用go的隐藏技能go:linkname引导编译器将当前(私有)方法或者变量在编译时链接到指定的位置的方法或者变量。另外timer和runtimeTimer的结构是一致的,所以程序运行正常。

    //startTimer将new的timer对象加入timer的堆数据结构中
    //startTimer adds t to the timer heap.
    //go:linkname startTimer time.startTimer
    func startTimer(t *timer) {
       if raceenabled {
          racerelease(unsafe.Pointer(t))
       }
       addtimer(t)
    }
    runtime/time.go的addtimer
    
    func addtimer(t *timer) {
       tb := t.assignBucket()
       lock(&tb.lock)
       ok := tb.addtimerLocked(t)
       unlock(&tb.lock)
       if !ok {
          badTimer()
       }
    }
    

    runtime/time.go的addtimerLocked

    // Add a timer to the heap and start or kick timerproc if the new timer is
    // earlier than any of the others.
    // Timers are locked.
    // Returns whether all is well: false if the data structure is corrupt
    // due to user-level races.
    func (tb *timersBucket) addtimerLocked(t *timer) bool {
       // when must never be negative; otherwise timerproc will overflow
       // during its delta calculation and never expire other runtime timers.
       if t.when < 0 {
          t.when = 1<<63 - 1
       }
       t.i = len(tb.t)
       tb.t = append(tb.t, t)
       if !siftupTimer(tb.t, t.i) {
          return false
       }
       if t.i == 0 {
          // siftup moved to top: new earliest deadline.
          if tb.sleeping && tb.sleepUntil > t.when {
             tb.sleeping = false
             notewakeup(&tb.waitnote)
          }
          if tb.rescheduling {
             tb.rescheduling = false
             goready(tb.gp, 0)
          }
          if !tb.created {
             tb.created = true
             go timerproc(tb)
          }
       }
       return true
    }
    

    runtime/time.go的timerproc

    func timerproc(tb *timersBucket) {
        tb.gp = getg()
        for {
            lock(&tb.lock)
            tb.sleeping = false
            now := nanotime()
            delta := int64(-1)
            for {
                if len(tb.t) == 0 { //无timer的情况
                    delta = -1
                    break
                }
                t := tb.t[0] //拿到堆顶的timer
                delta = t.when - now
                if delta > 0 { // 所有timer的时间都没有到期
                    break
                }
                if t.period > 0 { // t[0] 是ticker类型,调整其到期时间并调整timer堆结构
                    // leave in heap but adjust next time to fire
                    t.when += t.period * (1 + -delta/t.period)
                    siftdownTimer(tb.t, 0)
                } else {
                    //Timer类型的定时器是单次的,所以这里需要将其从堆里面删除
                    // remove from heap
                    last := len(tb.t) - 1
                    if last > 0 {
                        tb.t[0] = tb.t[last]
                        tb.t[0].i = 0
                    }
                    tb.t[last] = nil
                    tb.t = tb.t[:last]
                    if last > 0 {
                        siftdownTimer(tb.t, 0)
                    }
                    t.i = -1 // mark as removed
                }
                f := t.f
                arg := t.arg
                seq := t.seq
                unlock(&tb.lock)
                if raceenabled {
                    raceacquire(unsafe.Pointer(t))
                }
                f(arg, seq) //sendTimer被调用的位置 ---------------------------------------
                lock(&tb.lock)
            }
            if delta < 0 || faketime > 0 {
                // No timers left - put goroutine to sleep.
                tb.rescheduling = true
                goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
                continue
            }
            // At least one timer pending. Sleep until then.
            tb.sleeping = true
            tb.sleepUntil = now + delta
            noteclear(&tb.waitnote)
            unlock(&tb.lock)
            notetsleepg(&tb.waitnote, delta)
        }
    }
    

    追踪了一圈,最终追踪到timerproc,发现了sendTimer被调用位置f(arg, seq) ,而且可以看到将channel c传到了sendTimer中。

    上面的这堆代码逻辑是什么意思呢?

    1.所有timer统一使用一个最小堆结构去维护,按照timer的when(到期时间)比较大小;
    
    2.for循环过程中,如果delta = t.when - now的时间大于0,则break,直到有到时间的timer才进行操作;
    
    3.timer处理线程从堆顶开始处理每个timer,对于到期的timer,如果其period>0,则表明该timer 属于Ticker类型,调整其下次到期时间并调整其在堆中的位置,否则从堆中移除该timer;
    
    4.调用该timer的处理函数以及其他相关工作;
    

    总结:

    1. 说到底是在 for 循环最小单位来判断
    2. 所有的 timer 都由一个最小堆结构去维护,每次执行完将不需要继续执行的删除,定时器再加上一个相同时间的单位继续放入堆中
    3. 可执行后通过 chan 的阻塞,放入一个信号去异步通知

    相关文章

      网友评论

          本文标题:time ticker 实现过程

          本文链接:https://www.haomeiwen.com/subject/knmnurtx.html