美文网首页
argo resource

argo resource

作者: 陈先生_9e91 | 来源:发表于2021-06-08 17:23 被阅读0次

V3.0.1

resource pod

apiVersion: v1
kind: Pod
metadata:
  annotations:
    workflows.argoproj.io/node-name: hobot-dag-20435.step-0
    workflows.argoproj.io/template: '{"name":"step-0","inputs":{},"outputs":{},"metadata":{},"resource":{"action":"create","manifest":"apiVersion:
      batch.volcano.sh/v1alpha1\nkind: Job\n","successCondition":"status.state.phase
      = Completed","failureCondition":"status.state.phase in (Failed, Terminated)"}}'
  labels:
    workflows.argoproj.io/completed: "false"
    workflows.argoproj.io/workflow: hobot-dag-20435
  name: hobot-dag-20435-3634668572
  namespace: argo
spec:
  containers:
  - command:
    - argoexec
    - resource
    - create
    env:
    - name: ARGO_POD_NAME
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.name
    - name: GODEBUG
      value: x509ignoreCN=0
    - name: ARGO_CONTAINER_NAME
      value: main
    image: xx/argoproj-argoexec:v3.0.1
    name: main
    resources: {}
    volumeMounts:
    - mountPath: /argo/podmetadata
      name: podmetadata
  restartPolicy: Never
  schedulerName: default-scheduler
  serviceAccount: argo
  serviceAccountName: argo
  volumes:
  - downwardAPI:
      defaultMode: 420
      items:
      - fieldRef:
          apiVersion: v1
          fieldPath: metadata.annotations
        path: annotations
    name: podmetadata
  - hostPath:
      path: /var/run/docker.sock
      type: Socket
    name: docker-sock

argoproj/argo-workflows/cmd/argoexec/commands/resource.go

func execResource(ctx context.Context, action string) error {
   wfExecutor := initExecutor()

   err := wfExecutor.StageFiles()

   isDelete := action == "delete"

   resourceNamespace, resourceName, err := wfExecutor.ExecResource(
      action, common.ExecutorResourceManifestPath, wfExecutor.Template.Resource.Flags,
   )

   if !isDelete {
      err = wfExecutor.WaitResource(ctx, resourceNamespace, resourceName)
 
      err = wfExecutor.SaveResourceParameters(ctx, resourceNamespace, resourceName)
   }
   return nil
}

argoproj/argo-workflows/cmd/argoexec/commands/root.go

func initExecutor() *executor.WorkflowExecutor {
   config, err := clientConfig.ClientConfig()
   executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor)
   config = restclient.AddUserAgent(config, fmt.Sprintf("argo-workflows/%s executor/%s", version.Version, executorType))

   namespace, _, err := clientConfig.Namespace()

   clientset, err := kubernetes.NewForConfig(config)

   podName, ok := os.LookupEnv(common.EnvVarPodName)
   
   // load resource template from file
   tmpl, err := executor.LoadTemplate(podAnnotationsPath)

   wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
   
   return &wfExecutor
}

Template file

root@hobot-dag-20435-3634668572:/# cat /argo/podmetadata/annotations
kubernetes.io/config.seen="2021-06-04T14:43:11.934368503+08:00"
kubernetes.io/config.source="api"
workflows.argoproj.io/node-name="hobot-dag-20435.step-0"
workflows.argoproj.io/template="{\"name\":\"step-0\",\"inputs\":{},\"outputs\":{},\"metadata\":{},\"resource\":{\"action\":\"create\",\"manifest\":\"apiVersion: batch.volcano.sh/v1alpha1\\nkind: Job\"successCondition\":\"status.state.phase = Completed\",\"failureCondition\":\"status.state.phase in (Failed, Terminated)\"}}"

argoproj/argo-workflows/workflow/executor/executor.go

// StageFiles will create any files required by script/resource templates
func (we *WorkflowExecutor) StageFiles() error {
   var filePath string
   var body []byte
   switch we.Template.GetType() {
   case wfv1.TemplateTypeScript:
      log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
      filePath = common.ExecutorScriptSourcePath
      body = []byte(we.Template.Script.Source)
   case wfv1.TemplateTypeResource:
      log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
      filePath = common.ExecutorResourceManifestPath
      body = []byte(we.Template.Resource.Manifest)
   default:
      return nil
   }
   err := ioutil.WriteFile(filePath, body, 0644)
   return nil
}

manifest.yaml

root@hobot-dag-20435-3634668572:/# cat /tmp/manifest.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
...
status:
  state:
    lastTransitionTime: null

argoproj/argo-workflows/workflow/executor/resource.go

// ExecResource will run kubectl action against a manifest
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, flags []string) (string, string, error) {
   args, err := we.getKubectlArguments(action, manifestPath, flags)

   cmd := exec.Command("kubectl", args...)

   out, err := cmd.Output()
   
   return obj.GetNamespace(), resourceName, nil
}

argoproj/argo-workflows/workflow/executor/resource.go

// WaitResource waits for a specific resource to satisfy either the success or failure condition
func (we *WorkflowExecutor) WaitResource(ctx context.Context, resourceNamespace string, resourceName string) error {
   // Start the condition result reader using PollImmediateInfinite
   // Poll intervall of 5 seconds serves as a backoff intervall in case of immediate result reader failure
   err := wait.PollImmediateInfinite(time.Second*5,
      func() (bool, error) {
         isErrRetry, err := checkResourceState(resourceNamespace, resourceName, successReqs, failReqs)

         if err == nil {
            log.Infof("Returning from successful wait for resource %s", resourceName)
            return true, nil
         }

         if isErrRetry {
            log.Infof("Waiting for resource %s resulted in retryable error %v", resourceName, err)
            return false, nil
         }

         log.Warnf("Waiting for resource %s resulted in non-retryable error %v", resourceName, err)
         return false, err
      })
   if err != nil {
      if err == wait.ErrWaitTimeout {
         log.Warnf("Waiting for resource %s resulted in timeout due to repeated errors", resourceName)
      } else {
         log.Warnf("Waiting for resource %s resulted in error %v", resourceName, err)
      }
      return err
   }

   return nil
}
// Function to do the kubectl get -w command and then waiting on json reading.
func checkResourceState(resourceNamespace string, resourceName string, successReqs labels.Requirements, failReqs labels.Requirements) (bool, error) {
   // kubectl get xxx -w -o json, 异步处理stdout
   cmd, reader, err := startKubectlWaitCmd(resourceNamespace, resourceName)
    
   // 处理stdout
   for {
      if checkIfResourceDeleted(resourceName, resourceNamespace) {
         return false, errors.Errorf(errors.CodeNotFound, "Resource %s in namespace %s has been deleted somehow.", resourceName, resourceNamespace)
      }

      jsonBytes, err := readJSON(reader)
      if err != nil {
         resultErr := err
         _ = cmd.Process.Kill()
         return true, resultErr
      }

      log.Info(string(jsonBytes))
      // 检查workload状态是否满足预期
      for _, req := range failReqs {
         msg := fmt.Sprintf("failure condition '%s' evaluated %v", req, failed)
      }

      for _, req := range successReqs {
         log.Infof("success condition '%s' evaluated %v", req, matched)
      }
      log.Infof("%d/%d success conditions matched", numMatched, len(successReqs))
   }
}

v 3.1

https://hub.fastgit.org/argoproj/argo-workflows/issues/4467
https://hub.fastgit.org/argoproj/argo-workflows/pull/5364
变化点:
kubectl -w变成client poll

相关文章

网友评论

      本文标题:argo resource

      本文链接:https://www.haomeiwen.com/subject/hxsjeltx.html