demo
func handle(e interface{}) error {
switch ev := e.(type) {
case *events.TaskOOM:
...
}
}
func main(){
client, err := containerd.New(defaults.DefaultAddress)
if err != nil {
panic(err)
}
ctx := namespaces.WithNamespace(context.TODO(), "k8s.io")
evts, errs := client.EventService().Subscribe(ctx)
for {
select {
case err = <-errs:
panic(err)
case evt := <-evts:
if evt.Event != nil {
v, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
panic(err)
}
handle(v)
}
}
}
}
相关源码
依赖于containerd的events service
services/events/service.go中
func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error {
...
eventq, errq := s.events.Subscribe(ctx, req.Filters...)
...
}
events/exchange/exchange.go中
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
...
go func() {
defer closeAll()
var err error
loop:
for {
select {
case ev := <-channel.C:
env, ok := ev.(*events.Envelope)
if !ok {
// TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect
// from this.
err = fmt.Errorf("invalid envelope encountered %#v; please file a bug", ev)
break
}
select {
case evch <- env:
case <-ctx.Done():
break loop
}
case <-ctx.Done():
break loop
}
}
if err == nil {
if cerr := ctx.Err(); cerr != context.Canceled {
err = cerr
}
}
errq <- err
}()
...
}
runtime/events.go中
topic有如下
const (
// TaskCreateEventTopic for task create
TaskCreateEventTopic = "/tasks/create"
// TaskStartEventTopic for task start
TaskStartEventTopic = "/tasks/start"
// TaskOOMEventTopic for task oom
TaskOOMEventTopic = "/tasks/oom"
// TaskExitEventTopic for task exit
TaskExitEventTopic = "/tasks/exit"
// TaskDeleteEventTopic for task delete
TaskDeleteEventTopic = "/tasks/delete"
// TaskExecAddedEventTopic for task exec create
TaskExecAddedEventTopic = "/tasks/exec-added"
// TaskExecStartedEventTopic for task exec start
TaskExecStartedEventTopic = "/tasks/exec-started"
// TaskPausedEventTopic for task pause
TaskPausedEventTopic = "/tasks/paused"
// TaskResumedEventTopic for task resume
TaskResumedEventTopic = "/tasks/resumed"
// TaskCheckpointedEventTopic for task checkpoint
TaskCheckpointedEventTopic = "/tasks/checkpointed"
// TaskUnknownTopic for unknown task events
TaskUnknownTopic = "/tasks/?"
)
topic对应结构
func GetTopic(e interface{}) string {
switch e.(type) {
case *events.TaskCreate:
return TaskCreateEventTopic
case *events.TaskStart:
return TaskStartEventTopic
case *events.TaskOOM:
return TaskOOMEventTopic
case *events.TaskExit:
return TaskExitEventTopic
case *events.TaskDelete:
return TaskDeleteEventTopic
case *events.TaskExecAdded:
return TaskExecAddedEventTopic
case *events.TaskExecStarted:
return TaskExecStartedEventTopic
case *events.TaskPaused:
return TaskPausedEventTopic
case *events.TaskResumed:
return TaskResumedEventTopic
case *events.TaskCheckpointed:
return TaskCheckpointedEventTopic
default:
log.L.Warnf("no topic for type %#v", e)
}
return TaskUnknownTopic
}
网友评论