美文网首页
argo resource

argo resource

作者: 陈先生_9e91 | 来源:发表于2021-06-08 17:23 被阅读0次

    V3.0.1

    resource pod

    apiVersion: v1
    kind: Pod
    metadata:
      annotations:
        workflows.argoproj.io/node-name: hobot-dag-20435.step-0
        workflows.argoproj.io/template: '{"name":"step-0","inputs":{},"outputs":{},"metadata":{},"resource":{"action":"create","manifest":"apiVersion:
          batch.volcano.sh/v1alpha1\nkind: Job\n","successCondition":"status.state.phase
          = Completed","failureCondition":"status.state.phase in (Failed, Terminated)"}}'
      labels:
        workflows.argoproj.io/completed: "false"
        workflows.argoproj.io/workflow: hobot-dag-20435
      name: hobot-dag-20435-3634668572
      namespace: argo
    spec:
      containers:
      - command:
        - argoexec
        - resource
        - create
        env:
        - name: ARGO_POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: GODEBUG
          value: x509ignoreCN=0
        - name: ARGO_CONTAINER_NAME
          value: main
        image: xx/argoproj-argoexec:v3.0.1
        name: main
        resources: {}
        volumeMounts:
        - mountPath: /argo/podmetadata
          name: podmetadata
      restartPolicy: Never
      schedulerName: default-scheduler
      serviceAccount: argo
      serviceAccountName: argo
      volumes:
      - downwardAPI:
          defaultMode: 420
          items:
          - fieldRef:
              apiVersion: v1
              fieldPath: metadata.annotations
            path: annotations
        name: podmetadata
      - hostPath:
          path: /var/run/docker.sock
          type: Socket
        name: docker-sock
    

    argoproj/argo-workflows/cmd/argoexec/commands/resource.go

    func execResource(ctx context.Context, action string) error {
       wfExecutor := initExecutor()
    
       err := wfExecutor.StageFiles()
    
       isDelete := action == "delete"
    
       resourceNamespace, resourceName, err := wfExecutor.ExecResource(
          action, common.ExecutorResourceManifestPath, wfExecutor.Template.Resource.Flags,
       )
    
       if !isDelete {
          err = wfExecutor.WaitResource(ctx, resourceNamespace, resourceName)
     
          err = wfExecutor.SaveResourceParameters(ctx, resourceNamespace, resourceName)
       }
       return nil
    }
    

    argoproj/argo-workflows/cmd/argoexec/commands/root.go

    func initExecutor() *executor.WorkflowExecutor {
       config, err := clientConfig.ClientConfig()
       executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor)
       config = restclient.AddUserAgent(config, fmt.Sprintf("argo-workflows/%s executor/%s", version.Version, executorType))
    
       namespace, _, err := clientConfig.Namespace()
    
       clientset, err := kubernetes.NewForConfig(config)
    
       podName, ok := os.LookupEnv(common.EnvVarPodName)
       
       // load resource template from file
       tmpl, err := executor.LoadTemplate(podAnnotationsPath)
    
       wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
       
       return &wfExecutor
    }
    

    Template file

    root@hobot-dag-20435-3634668572:/# cat /argo/podmetadata/annotations
    kubernetes.io/config.seen="2021-06-04T14:43:11.934368503+08:00"
    kubernetes.io/config.source="api"
    workflows.argoproj.io/node-name="hobot-dag-20435.step-0"
    workflows.argoproj.io/template="{\"name\":\"step-0\",\"inputs\":{},\"outputs\":{},\"metadata\":{},\"resource\":{\"action\":\"create\",\"manifest\":\"apiVersion: batch.volcano.sh/v1alpha1\\nkind: Job\"successCondition\":\"status.state.phase = Completed\",\"failureCondition\":\"status.state.phase in (Failed, Terminated)\"}}"
    

    argoproj/argo-workflows/workflow/executor/executor.go

    // StageFiles will create any files required by script/resource templates
    func (we *WorkflowExecutor) StageFiles() error {
       var filePath string
       var body []byte
       switch we.Template.GetType() {
       case wfv1.TemplateTypeScript:
          log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
          filePath = common.ExecutorScriptSourcePath
          body = []byte(we.Template.Script.Source)
       case wfv1.TemplateTypeResource:
          log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
          filePath = common.ExecutorResourceManifestPath
          body = []byte(we.Template.Resource.Manifest)
       default:
          return nil
       }
       err := ioutil.WriteFile(filePath, body, 0644)
       return nil
    }
    

    manifest.yaml

    root@hobot-dag-20435-3634668572:/# cat /tmp/manifest.yaml
    apiVersion: batch.volcano.sh/v1alpha1
    kind: Job
    ...
    status:
      state:
        lastTransitionTime: null
    

    argoproj/argo-workflows/workflow/executor/resource.go

    // ExecResource will run kubectl action against a manifest
    func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, flags []string) (string, string, error) {
       args, err := we.getKubectlArguments(action, manifestPath, flags)
    
       cmd := exec.Command("kubectl", args...)
    
       out, err := cmd.Output()
       
       return obj.GetNamespace(), resourceName, nil
    }
    

    argoproj/argo-workflows/workflow/executor/resource.go

    // WaitResource waits for a specific resource to satisfy either the success or failure condition
    func (we *WorkflowExecutor) WaitResource(ctx context.Context, resourceNamespace string, resourceName string) error {
       // Start the condition result reader using PollImmediateInfinite
       // Poll intervall of 5 seconds serves as a backoff intervall in case of immediate result reader failure
       err := wait.PollImmediateInfinite(time.Second*5,
          func() (bool, error) {
             isErrRetry, err := checkResourceState(resourceNamespace, resourceName, successReqs, failReqs)
    
             if err == nil {
                log.Infof("Returning from successful wait for resource %s", resourceName)
                return true, nil
             }
    
             if isErrRetry {
                log.Infof("Waiting for resource %s resulted in retryable error %v", resourceName, err)
                return false, nil
             }
    
             log.Warnf("Waiting for resource %s resulted in non-retryable error %v", resourceName, err)
             return false, err
          })
       if err != nil {
          if err == wait.ErrWaitTimeout {
             log.Warnf("Waiting for resource %s resulted in timeout due to repeated errors", resourceName)
          } else {
             log.Warnf("Waiting for resource %s resulted in error %v", resourceName, err)
          }
          return err
       }
    
       return nil
    }
    
    // Function to do the kubectl get -w command and then waiting on json reading.
    func checkResourceState(resourceNamespace string, resourceName string, successReqs labels.Requirements, failReqs labels.Requirements) (bool, error) {
       // kubectl get xxx -w -o json, 异步处理stdout
       cmd, reader, err := startKubectlWaitCmd(resourceNamespace, resourceName)
        
       // 处理stdout
       for {
          if checkIfResourceDeleted(resourceName, resourceNamespace) {
             return false, errors.Errorf(errors.CodeNotFound, "Resource %s in namespace %s has been deleted somehow.", resourceName, resourceNamespace)
          }
    
          jsonBytes, err := readJSON(reader)
          if err != nil {
             resultErr := err
             _ = cmd.Process.Kill()
             return true, resultErr
          }
    
          log.Info(string(jsonBytes))
          // 检查workload状态是否满足预期
          for _, req := range failReqs {
             msg := fmt.Sprintf("failure condition '%s' evaluated %v", req, failed)
          }
    
          for _, req := range successReqs {
             log.Infof("success condition '%s' evaluated %v", req, matched)
          }
          log.Infof("%d/%d success conditions matched", numMatched, len(successReqs))
       }
    }
    

    v 3.1

    https://hub.fastgit.org/argoproj/argo-workflows/issues/4467
    https://hub.fastgit.org/argoproj/argo-workflows/pull/5364
    变化点:
    kubectl -w变成client poll

    相关文章

      网友评论

          本文标题:argo resource

          本文链接:https://www.haomeiwen.com/subject/hxsjeltx.html