valcano
参考:
Error Handling
// Event is the type of Event related to the Job
type Event string
const (
// AllEvents means all event
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)
// Action is the type of event handling
type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
)
// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
Event Event `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}
通过LifecyclePolicy
对不同Job event做针对性处理。例如分布式训练,一个task
失败,可以让整个Job重新运行。
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# If any event here, restart the whole job.
policies:
- event: *
action: RestartJob
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
...
还可以针对每个task指定action
Gang-schduling
这部分完全就是kube-batch
的设计。spec.minAvailable
表示结对调度pod的数量。spec.minAvailable
默认值是spec.tasks.replicas
的总和。如果spec.minAvailable
> sumspec.tasks.replicas
,就无法创建Job。如果spec.minAvailable
< sumspec.tasks.replicas
,就根据task优先级创建或者随机创建。
task优先级
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri" // 高优先级
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
调度器只能保证优先调度Job中高优的pods
Job plugins
plugins:
ssh: []
env: []
svc: []
目前提供3种插件,帮助用户运行AI job
- env:目前只加了
VK_TASK_INDEX
,后期应该至少会把数组值用起来 - svc:给Job创建svc,维护host文件,方便pods通信
func generateHost(job *vkv1.Job) map[string]string {
data := make(map[string]string, len(job.Spec.Tasks))
for _, ts := range job.Spec.Tasks {
hosts := make([]string, 0, ts.Replicas)
for i := 0; i < int(ts.Replicas); i++ {
hostName := ts.Template.Spec.Hostname
subdomain := ts.Template.Spec.Subdomain
if len(hostName) == 0 {
hostName = vkhelpers.MakePodName(job.Name, ts.Name, i)
}
if len(subdomain) == 0 {
subdomain = job.Name
}
hosts = append(hosts, hostName+"."+subdomain)
if len(ts.Template.Spec.Hostname) != 0 {
break
}
}
key := fmt.Sprintf(ConfigMapTaskHostFmt, ts.Name)
data[key] = strings.Join(hosts, "\n")
}
return data
}
- ssh:ssh免密通信
网友评论