美文网首页
19-定时任务库“robfig/cron”核心源码解读

19-定时任务库“robfig/cron”核心源码解读

作者: 欢乐毅城 | 来源:发表于2021-06-07 19:05 被阅读0次

    定时任务是项目中一个比较常见的情景。那么,今天我们就来介绍一个github上还不错的定时任务库“github.com/robfig/cron.git/v3”。

    拉取库文件:

      go get github.com/robfig/cron/v3@v3.0.0
    

    demo示例:

    package main
    
    import (
        "fmt"
        "time"
        "github.com/robfig/cron/v3"
    )
    
    //定时任务
    func timedTask() {
        fmt.Printf("执行定时任务: %s \n", time.Now().Format("2006-01-02 15:04:05"))
    }
    
    func main() {
        //初始化一个cron对象
        c := cron.New()
    
        //方法一:通过AddFunc注册,任务调度
        spec := "50,51 17 * * *"  //每天17点50分,51分执行
        //spec := "@every 3s"   //每3秒执行一次(秒数可以超过60s)
        //spec := "CRON_TZ=Asia/Tokyo 30 04 * * *"
        
        //参数一:调度的时间策略,参数二:到时间后执行的方法。
        enterId, err := c.AddFunc(spec, timedTask)
        if err != nil {
            panic(err)
        }
        fmt.Printf("任务id是 %d \n", enterId)
    
        //启动
        c.Start()
    
        //用于阻塞 后面可以使用 select {} 阻塞
        time.Sleep(time.Hour * 9)
    
        //关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
        c.Stop()
    }
    
     //方法二:通过AddJob注册
       type myType int
       func (c myType) Run() {
          fmt.Println("myType 实现了 Run方法")
          return
       }
       
       var dataNew myType = 10
       c.AddJob("@every 5s", dataNew)
       //调用方法AddJob(spec string, cmd Job)也可以实现AddFunc注册的功能,
       //Job是interface,需要入参类型实现方法:Run()。实际上,
       //方法AddFunc内部将参数cmd 进行了包装(wrapper),然后也是调用方法AddJob进行注册。
    

    源码分析:

    • 基本数据结构体:(cron.go)
      //Cron数据结构
      type Cron struct {
        entries   []*Entry  //调度执行实体列表(或job的指针对象)
        chain     Chain     //chain用来定义Entry里的warppedJob使用的逻辑
        stop      chan struct{}  //停止所有调度任务
        add       chan *Entry    //添加一个调度任务
        remove    chan EntryID   //移除一个调度任务
        snapshot  chan chan []Entry  //运行中的调度任务
        running   bool  //代表是否已经在执行,用于操作整个cron对象只启动一次
        logger    Logger  //记录日志信息
        runningMu sync.Mutex //协程锁,确保运行数据安全,比如增加或移除entry
        location  *time.Location   // 时区
        parser    ScheduleParser  //对时间格式进行解析
        nextID    EntryID   //下一个任务的id
        jobWaiter sync.WaitGroup //run task时进行add(1),结束时done(),以此保证所有job都能退出
      }
      
      // Entry 数据结构,每一个被调度实体一个
      type Entry struct {
        // 唯一id,用于查询和删除,默认是自增的
           ID EntryID 
        //本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
           Schedule Schedule
        // 本entry下次需要执行的绝对时间,会一直被更新
        // 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
        // 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
           Next time.Time 
        //上一次被执行时间,主要用来查询
           Prev time.Time 
        //WrappedJob 是真实执行的Job实体(执行的任务)
           WrappedJob Job
        //Job主要给用户查询
           Job Job
      }
    
    • 核心函数run() :
    // run the scheduler.. this is private just due to the need to synchronize
    // access to the 'running' state variable.
    func (c *Cron) run() {
        c.logger.Info("start")
      
        // Figure out the next activation times for each entry.
        //获取当前时间
        now := c.now()
        //循环调入任务,计算下一次任务的执行时间
        for _, entry := range c.entries {
            entry.Next = entry.Schedule.Next(now)
            c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
        }
          
          //第一层死循环,无限循环
        for {
            // Determine the next entry to run.
            // 按时间先后排队调度任务
            sort.Sort(byTime(c.entries))
      
            var timer *time.Timer
            if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
                // If there are no entries yet, just sleep - it still handles new entries
                // and stop requests.
                // 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,
                //用于阻塞下面的 select{} ,因为`select`是多路复用,
                //其他channel能返回数据时,select就回执行不会阻塞。
                // 所以当没有任务时,启动Start()程序 就会被这个阻塞
                timer = time.NewTimer(100000 * time.Hour)
            } else {
                //如果有调度信息,就 sleep 调度任务中第一个的 循环时间 
                timer = time.NewTimer(c.entries[0].Next.Sub(now))
            }
      
              // 第二层死循环,内部使用select{}阻塞
            for {
                select {
                case now = <-timer.C:  //上一步中的 timer sleep时间如果到了就执行
                    now = now.In(c.location)
                    c.logger.Info("wake", "now", now)
      
                    // Run every entry whose next time was less than now
                    for _, e := range c.entries {
                        if e.Next.After(now) || e.Next.IsZero() {
                            break
                        }
                        c.startJob(e.WrappedJob)
                        e.Prev = e.Next
                        e.Next = e.Schedule.Next(now)
                        c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                    }
      
                case newEntry := <-c.add: //向Cron中添加了 一个调度任务就会执行
                    timer.Stop()
                    now = c.now()
                    newEntry.Next = newEntry.Schedule.Next(now)
                    c.entries = append(c.entries, newEntry)
                    c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
      
                case replyChan := <-c.snapshot:
                    replyChan <- c.entrySnapshot()
                    continue
                       
                case <-c.stop:   // 停止定时任务
                    timer.Stop()
                    c.logger.Info("stop")
                    return
      
                case id := <-c.remove: // 移除任务
                    timer.Stop()
                    now = c.now()
                    c.removeEntry(id)
                    c.logger.Info("removed", "entry", id)
                }
                   //当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
                break
            }
        }
    }
      
    
    • 时间书写格式:
    CRON Expression Format (CRON表达式格式):
    Field name Mandatory? Allowed values Allowed special characters
    Minutes Yes 0-59 * / , -
    Hours Yes 0-23 * / , -
    Day of month Yes 1-31 * / , - ?
    Month Yes 1-12 or JAN-DEC * / , -
    Day of week Yes 0-6 or SUN-SAT * / , - ?
    Predefined schedules( 预定义的时间表):
    Entry(输入) Description(描述) Equivalent To(等于)
    @yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 1 1 *
    @monthly Run once a month, midnight, first of month 0 0 1 * *
    @weekly Run once a week, midnight between Sat/Sun 0 0 * * 0
    @daily (or @midnight) Run once a day, midnight 0 0 * * *
    @hourly Run once an hour, beginning of hour 0 * * * *

    欠缺的地方:

    • 不支持持久化,重启一下服务,调度任务信息就没了,需要自己存储调度信息。
    • 不支持一次定时,若想实现此功能,调用一次之后再调用移除可以实现。

    相关文章

      网友评论

          本文标题:19-定时任务库“robfig/cron”核心源码解读

          本文链接:https://www.haomeiwen.com/subject/nlqreltx.html