package main
import (
"errors"
"log"
"os"
"os/signal"
"time"
)
var (
// ErrTimeout 任务执行超时时返回
ErrTimeout = errors.New("received timeout")
// ErrInterrupt 接收到操作系统的事件时返回
ErrInterrupt = errors.New("received interrupt")
)
// Runner 在给定的超时时间内执行一组任务
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {
// interrupt 通道报告从操作系统
// 发送的信号
interrupt chan os.Signal
// complete 通道报告处理任务已完成
complete chan error
// timeout 报告处理任务已经超时
timeout <-chan time.Time
// tasks 持有一组以索引顺序依次执行的函数
tasks []func(int)
}
// NewRunner 创建Runner
func NewRunner(duration time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(duration),
}
}
// Add 将一个任务附加到runner上
// 任务接收一个int类型的🆔作为函数参数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
func (r *Runner) Start() error {
// 接收中断信号
signal.Notify(r.interrupt, os.Interrupt)
go func() {
r.complete <- r.Run()
}()
select {
// 任务完成
case err := <-r.complete:
return err
// 任务超时
case <-r.timeout:
return ErrTimeout
}
}
// Run 执行注册的任务
func (r *Runner) Run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
// gotInterrupt 是否收到中断信号
func (r *Runner) gotInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
// Demo 模拟耗时任务
func Demo() func(int) {
return func(i int) {
log.Printf("Processor - Task #%d.\n", i)
time.Sleep(time.Second * time.Duration(i))
}
}
func main() {
log.Println("Process start.")
timeout := time.Second * 3 // 3s过期
r := NewRunner(timeout)
r.Add(Demo(), Demo(), Demo())
if err := r.Start(); err != nil {
switch err {
case ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
网友评论