(1)使用方法
timer := time.NewTicker(time.Duration(GAP) * time.Second)
for {
select {
case now := <-timer.C:
fmt.Println(now)
}
}
(2)NewTicker 函数
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d), //下一次要触发的纳秒级时间,绝对时间(当前纳秒级时间+d)
period: int64(d), //触发周期,即d
f: sendTime, //定时器触发时,调用的函数,写channel 函数
arg: c, //这里把c作为f的参数,有个细节的原因是Ticker.C 是receive-only type chan
},
}
startTimer(&t.r)
return t
}
type Ticker struct {
C <-chan Time //定时满足后,执行runtimeTimer 中 f 函数 写channel
r runtimeTimer
}
type runtimeTimer struct {
tb uintptr
i int
when int64
period int64
f func(interface{}, uintptr) // NOTE: must not be closure
arg interface{}
seq uintptr
}
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default: //default 的逻辑是,如果没有及时的读取time chan 的值,就走default
}
}
(3)startTimer 函数
func startTimer(t *timer) {
addtimer(t) //把当前任务加到定时器中
}
func addtimer(t *timer) {
tb := t.assignBucket()
lock(&tb.lock)
ok := tb.addtimerLocked(t)
unlock(&tb.lock)
if !ok {
badTimer()
}
}
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen //getg().m.p.ptr().id 是什么我还没搞明白,先不过分纠结,随后再看m/p/g
t.tb = &timers[id].timersBucket //分配一个timersBucket
return t.tb
}
var timers [64]struct {
timersBucket
pad [64]byte
}
type timersBucket struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
sleepUntil int64
waitnote note
t []*timer
}
timersLen 是全局timer 数组的大小
(4)addtimerLocked函数
func (tb *timersBucket) addtimerLocked(t *timer) bool {
t.i = len(tb.t) //timersBucket.t 是一个timer 数组,用来管理timer,与全局timer不同
tb.t = append(tb.t, t)
//调整四叉堆
if !siftupTimer(tb.t, t.i) {
return false
}
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if tb.sleeping {
tb.sleeping = false
notewakeup(&tb.waitnote) //先不过分纠结,看不懂
}
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0) //先不过分纠结,看不懂
}
}
if !tb.created {
tb.created = true
go timerproc(tb)
}
return true
}
(5)siftupTimer函数
func siftupTimer(t []*timer, i int) bool {
if i >= len(t) {
return false
}
when := t[i].when
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
break
}
t[i] = t[p]
t[i].i = i
i = p
}
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
return true
}
(6)timerproc函数
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
lock(&tb.lock)
tb.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(tb.t) == 0 {
delta = -1
break
}
t := tb.t[0]
delta = t.when - now
if delta > 0 {
break
}
ok := true
if t.period > 0 {
// leave in heap but adjust next time to fire //调整下一次的位置
t.when += t.period * (1 + -delta/t.period)
if !siftdownTimer(tb.t, 0) {
ok = false
}
} else {
// remove from heap //删除该任务
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
if !siftdownTimer(tb.t, 0) {
ok = false
}
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
if !ok {
badTimer()
}
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq) //执行触发函数
lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
tb.rescheduling = true
goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
notetsleepg(&tb.waitnote, delta)
}
}
网友评论