Job用来在k8s集群中批量执行一次性任务,并提供了失败重试机制。其载体可以是容器中的一个脚本。
下面是创建一个Job的具体流程的案例
创建一个执行任务的镜像
业务脚本testFunc.py
import argparse
import json
def concat(firstName, n):
return json.dumps({'status': 200, 'msg': firstName*n})
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--firstName', type=str)
parser.add_argument('--n', type=int)
args = parser.parse_args()
result = concat(args.firstName, args.n)
print(result)
Dockerfile
FROM python:3.9
WORKDIR /app
COPY testFunc.py /app
CMD ["python", "testFunc.py", "--firstName", "Tommy", "--n", "2"]
使用docker build -t lyudmilalala/job-test .
创建本地镜像lyudmilalala/job-test
测试镜像
$ docker run -it --rm lyudmilalala/job-test
{"status": 200, "msg": "TommyTommy"}
通过修改启动命令cmd
可以获得不同结果
$ docker run -it --rm lyudmilalala/job-test python testFunc.py --firstName "Test" --n 5
{"status": 200, "msg": "TestTestTestTestTest"}
使用yaml创建k8s job
启动k8s job用的yaml配置文件testFunc.yml
使用yaml启动job【确保k8s集群已经开启】
kubectl apply -f .\testFunc.yml
一开始的任务才开始执行,pod刚启动
$ kubectl get job -A
NAMESPACE NAME COMPLETIONS DURATION AGE
default job-test 0/4 1s 1s
$ kubectl get pods -n default
NAME READY STATUS RESTARTS AGE
job-test-q9cdr 0/1 ContainerCreating 0 1s
job-test-w2lr7 0/1 ContainerCreating 0 1s
job-test-xbd8d 0/1 ContainerCreating 0 1s
过了一会儿后,pod执行完毕,job也被标记为完成
$ kubectl get pods -n default
NAME READY STATUS RESTARTS AGE
job-test-7vgqz 0/1 Completed 0 6s
job-test-q9cdr 0/1 Completed 0 9s
job-test-w2lr7 0/1 Completed 0 9s
job-test-xbd8d 0/1 Completed 0 9s
$ kubectl get job -A
NAMESPACE NAME COMPLETIONS DURATION AGE
default job-test 4/4 6s 18s
查看对应的执行了任务的pod的log,可以看到结果
$ kubectl logs job-test-867x8
{"status": 200, "msg": "HelenHelenHelenHelen"}
Job不会自动删除,因此完成后需要手动删除
Job使用的pod也不会自动删除,会在Job被手动删除时跟着一起删除
kubectl delete job job-test
使用java程序创建k8s job
引用io.fabric8.kubernetes.client
包
Java程序
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
public class Main {
public static void main(String[] args) {
// Set up config
Config config = new ConfigBuilder()
.withMasterUrl("https://kubernetes.docker.internal:6443/")
.withOauthToken("my-jwt-token-in-secret")
// .withUsername("username").withPassword("password") if you're using basic auth
// .withTrustCerts(false) if you want to disable certification checks
.build();
// Initialize client
try (KubernetesClient client = new DefaultKubernetesClient(config)) {
final String namespace = "default";
final Job job = new JobBuilder()
.withApiVersion("batch/v1")
.withNewMetadata()
.withName("java-job-test")
// .withLabels()
// .withAnnotations()
.endMetadata()
.withNewSpec()
.withNewTemplate()
.withNewSpec()
.addNewContainer()
.withName("java-job-test-container")
.withImagePullPolicy("IfNotPresent")
.withImage("lyudmilalala/job-test")
.withCommand("python", "testFunc.py", "--firstName", "Alice", "--n", "3")
.endContainer()
.withRestartPolicy("Never")
.endSpec()
.endTemplate()
.endSpec()
.build();
job.getSpec().setParallelism(3);
job.getSpec().setCompletions(4);
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
// Get All pods created by the job
PodList podList = client.pods().inNamespace(namespace).withLabel("job-name", job.getMetadata().getName()).list();
// Wait for pod to complete
client.pods().inNamespace(namespace).withName(podList.getItems().get(0).getMetadata().getName())
.waitUntilCondition(pod -> pod.getStatus().getPhase().equals("Succeeded"), 1, TimeUnit.MINUTES);
// Print Job's log
String joblog = client.batch().v1().jobs().inNamespace(namespace).withName("java-job-test").getLog();
System.out.println(joblog);
} catch (KubernetesClientException e) {
e.printStackTrace();
}
}
}
网友评论