美文网首页
并发模式(1)-runner

并发模式(1)-runner

作者: 快感炮神 | 来源:发表于2020-10-27 18:01 被阅读0次
    package main
    
    import (
        "errors"
        "log"
        "os"
        "os/signal"
        "time"
    )
    
    var (
        // ErrTimeout 任务执行超时时返回
        ErrTimeout = errors.New("received timeout")
        // ErrInterrupt 接收到操作系统的事件时返回
        ErrInterrupt = errors.New("received interrupt")
    )
    
    // Runner 在给定的超时时间内执行一组任务
    // 并且在操作系统发送中断信号时结束这些任务
    type Runner struct {
        // interrupt 通道报告从操作系统
        // 发送的信号
        interrupt chan os.Signal
    
        // complete 通道报告处理任务已完成
        complete chan error
    
        // timeout 报告处理任务已经超时
        timeout <-chan time.Time
    
        // tasks 持有一组以索引顺序依次执行的函数
        tasks []func(int)
    }
    
    // NewRunner 创建Runner
    func NewRunner(duration time.Duration) *Runner {
        return &Runner{
            interrupt: make(chan os.Signal, 1),
            complete:  make(chan error),
            timeout:   time.After(duration),
        }
    }
    
    // Add 将一个任务附加到runner上
    // 任务接收一个int类型的🆔作为函数参数
    func (r *Runner) Add(tasks ...func(int)) {
        r.tasks = append(r.tasks, tasks...)
    }
    
    func (r *Runner) Start() error {
        // 接收中断信号
        signal.Notify(r.interrupt, os.Interrupt)
    
        go func() {
            r.complete <- r.Run()
        }()
    
        select {
        // 任务完成
        case err := <-r.complete:
            return err
        // 任务超时
        case <-r.timeout:
            return ErrTimeout
        }
    }
    
    // Run 执行注册的任务
    func (r *Runner) Run() error {
        for id, task := range r.tasks {
            if r.gotInterrupt() {
                return ErrInterrupt
            }
    
            task(id)
        }
        return nil
    }
    
    // gotInterrupt 是否收到中断信号
    func (r *Runner) gotInterrupt() bool {
        select {
        case <-r.interrupt:
            signal.Stop(r.interrupt)
            return true
        default:
            return false
        }
    }
    
    // Demo 模拟耗时任务
    func Demo() func(int) {
        return func(i int) {
            log.Printf("Processor - Task #%d.\n", i)
            time.Sleep(time.Second * time.Duration(i))
        }
    }
    
    func main() {
        log.Println("Process start.")
    
        timeout := time.Second * 3 // 3s过期
        r := NewRunner(timeout)
    
        r.Add(Demo(), Demo(), Demo())
        if err := r.Start(); err != nil {
            switch err {
            case ErrTimeout:
                log.Println("Terminating due to timeout.")
                os.Exit(1)
            case ErrInterrupt:
                log.Println("Terminating due to interrupt.")
                os.Exit(2)
            }
        }
    
        log.Println("Process ended.")
    }
    
    

    相关文章

      网友评论

          本文标题:并发模式(1)-runner

          本文链接:https://www.haomeiwen.com/subject/tgixvktx.html