美文网首页
SKIL/工作流程/执行作业

SKIL/工作流程/执行作业

作者: hello风一样的男子 | 来源:发表于2019-04-20 21:34 被阅读0次

    执行作业

    作业是在后台对连接的SKIL资源运行的计算。作业可以有两种类型:

    1. 训练
    2. 推理

    运行作业

    完成将外部资源连接到skil之后,可以通过以下两种方式对其执行训练/推理作业。

    A. CLI
    B. REST 端

    skil jobs命令管理通过SKIL CLI执行作业的工作流程。等效的REST端点也可用。
    作业工作流程如下:

    1. 创建一个作业。
    2. 提交在步骤1中创建的作业以运行。作业在后台开始运行。
    3. 定期检查正在运行的作业的状态。
    4. 从已完成的作业下载输出文件。
    5. 从作业列表中删除已完成的作业(可选)。

    可以使用SKIL同时创建和运行多个作业。它们的执行顺序将取决于底层计算资源。

    1. 创建作业

    对于创建作业,skil提供以下内容:
    cli:skil jobs create<args>。
    REST端点:post -/jobs/<type>。其中<type>指的是作业类型,可以是训练或推理。
    两种变体的格式如下:

    cli

    skil jobs create --type <training/inference> --storageResourceId <storage_resource_id> --computeResourceId <compute_resource_id> --jobArgs <job_arguments> --outputFileName <output_file_name>
    
    image.gif

    REST - cURL

    curl -d '{"computeResourceId": <compute_resource_id>, "storageResourceId": <storage_resource_id>, "jobArgs": "<skil_spark_main_args>", "outputFileName": "<output_file_name>"}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<type>
    
    # <type> => one of ["training", "inference"]
    
    image.gif

    参数如下:

    | 参数 | 详情 | 类型 |
    | computeResourceId | 计算资源的ID。 | Long |
    | storageResourceId | 存储资源的ID。 | Long |
    | jobArgs |

    作业参数

    示例:

    <pre>"jobArgs":"-mo /var/skil/neuralnet.zip -tm /var/skil/parameteraveraging.json -dsp io.skymind.skil.train.spark.MnistProvider --evalType evaluation --numEpochs 1 --outputPath /tmp/output5049941383652659983.zip"</pre>

    | String |
    | outputFileName | 从作业生成的输出文件应该具有的名称。 | String |
    | type | 作业类型["training", "inference"]之一 | String |

    2.运行工作
    要运行作业,请使用以下任一项:
    cli:skil jobs run--id<jobid>。
    REST端点:post-/jobs/<id>/run。创建作业时会获取作业ID。
    它们的格式是:

    CLI

    skil jobs run --id <jobId>
    
    image.gif

    REST 端

    curl -d '{}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/run
    
    image.gif

    参数如下

    | 参数 | 详情 | 类型 |
    | id | 作业ID,创建作业后获得 | Long |

    3.获取作业状态
    注意
    由YARM资源支持的作业目前不支持状态检查。
    要接收作业状态,请使用以下任一项:
    cli:skil jobs status --id<jobid>。
    REST端点:get-/jobs/<id>/refresh。创建作业时会获得作业ID。
    它们的格式是:

    skil jobs status --id <jobId>
    
    image.gif

    REST 端

    curl -d '{}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/refresh
    
    image.gif

    参数如下

    | 参数 | 详情 | 类型 |
    | id | 作业ID,创建作业后获得 | Long |

    4.从作业中获取输出
    目前,只有通过REST端点才支持下载作业的输出。
    注意
    只有当作业的运行状态为“完成”时,此操作才有效。
    相关端点及其格式如下:
    REST端点:post -/jobs/<id>/outputfile。创建作业时会获取作业ID。

    REST 端

    curl -d '{"localDownloadPath": "<local_download_path>"}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/outputfile
    
    image.gif

    参数如下

    | 参数 | 详情 | 类型 |
    | localDownloadPath | 要下载输出的本地文件路径。 | String |
    | id | 作业ID,创建作业后获得 | Long |

    5.删除完成的作业(可选)
    要删除作业,请使用以下任一项:
    cli:skil jobs rm --id<jobid>。
    REST端点:DELETE -http://localhost:9008/jobs/<id>。创建作业时会获取作业ID。
    它们的格式是:

    CLI

    skil jobs rm --id <jobId>
    
    image.gif

    REST 端

    curl -H "Authorization: Bearer <auth_token>" -X DELETE http://localhost:9008/jobs/<id>
    
    image.gif

    参数如下

    | 参数 | 详情 | 类型 |
    | id | 作业ID,创建作业后获得 | Long |

    创建并运行作业同样可以在笔记本中执行如下示例代码来完成

    import io.skymind.auth.JWTUtil;
    import io.skymind.jobs.client.JobClient;
    import io.skymind.jobs.model.JobModel;
    import io.skymind.jobs.model.JobRun;
    import io.skymind.jobs.model.JobType;
    import io.skymind.resource.client.SKILResourceClient;
    import io.skymind.resource.model.Resource;
    import io.skymind.resource.model.subtypes.ResourceDetails;
    import io.skymind.resource.model.subtypes.compute.SparkResourceDetails;
    import io.skymind.resource.model.subtypes.storage.HDFSResourceDetails;
    import io.skymind.skil.train.spark.MnistKeyedProvider;
    import org.apache.spark.storage.StorageLevel;
    import org.datavec.api.transform.serde.JsonMappers;
    import org.deeplearning4j.nn.api.OptimizationAlgorithm;
    import org.deeplearning4j.nn.conf.MultiLayerConfiguration;
    import org.deeplearning4j.nn.conf.NeuralNetConfiguration;
    import org.deeplearning4j.nn.conf.inputs.InputType;
    import org.deeplearning4j.nn.conf.layers.ConvolutionLayer;
    import org.deeplearning4j.nn.conf.layers.DenseLayer;
    import org.deeplearning4j.nn.conf.layers.OutputLayer;
    import org.deeplearning4j.nn.conf.layers.SubsamplingLayer;
    import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
    import org.deeplearning4j.nn.weights.WeightInit;
    import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster;
    import org.deeplearning4j.util.ModelSerializer;
    import org.nd4j.linalg.activations.Activation;
    import org.nd4j.linalg.learning.config.Nesterovs;
    import org.nd4j.linalg.lossfunctions.LossFunctions;
    import com.mashape.unirest.http.Unirest;
    import scala.collection.JavaConversions._
    
    import java.io.File;
    import java.io.IOException;
    import java.util.List;
    
          def getTrainingMasterConfFile(nameNodePort: Int, folder: File): File = {
            val parameterAveragingTrainingMaster: ParameterAveragingTrainingMaster =
              new ParameterAveragingTrainingMaster.Builder(100)
                .batchSizePerWorker(1000)
                .averagingFrequency(3)
                .exportDirectory("hdfs://localhost:" + nameNodePort + new File(
                  folder,
                  "exportdir").getAbsolutePath)
                .workerPrefetchNumBatches(10)
                .storageLevel(StorageLevel.DISK_ONLY)
                .build()
            val jsonWriteParamAveraging: File =
              new File(folder, "parameteraveraging.json")
            JsonMappers.getMapper
              .writeValue(jsonWriteParamAveraging, parameterAveragingTrainingMaster)
            jsonWriteParamAveraging
          }
    
          def getModelFile(folder: File): File = {
            val builder
              : MultiLayerConfiguration.Builder = // Training iterations as above
              new NeuralNetConfiguration.Builder()
                .seed(230)
                .l2(0.0005)
                .weightInit(WeightInit.XAVIER)
                .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
                .updater(Nesterovs.builder().learningRate(0.01).momentum(0.9).build())
                .list()
                .layer(0,
                       new ConvolutionLayer.Builder(5, 5)
                         .nIn(1)
                         .stride(1, 1)
                         .nOut(20)
                         .activation(Activation.IDENTITY)
                         .build())
                .layer(1,
                       new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
                         .kernelSize(2, 2)
                         .stride(2, 2)
                         .build())
                .layer(2,
                       new ConvolutionLayer.Builder(5, 5)
                         .stride(1, 1)
                         .nOut(50)
                         .activation(Activation.IDENTITY)
                         .build())
                .layer(3,
                       new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
                         .kernelSize(2, 2)
                         .stride(2, 2)
                         .build())
                .layer(4,
                       new DenseLayer.Builder()
                         .activation(Activation.RELU)
                         .nOut(500)
                         .build())
                .layer(5,
                       new OutputLayer.Builder(
                         LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
                         .nOut(10)
                         .activation(Activation.SOFTMAX)
                         .build())
                .setInputType( //See note below
                  InputType.convolutionalFlat(28, 28, 1))
            val network: MultiLayerNetwork = new MultiLayerNetwork(builder.build())
            network.init()
            val neuralNet: File = new File(folder, "neuralnet.zip")
            ModelSerializer.writeModel(network, neuralNet, true)
            neuralNet
          }
    
        def main() {
            val localSparkHome: String = "/opt/spark"
            val sparkMasterUrl: String = "spark://localhost:7077"
            val hadoopNamenodeHost: String = "localhost"
            val hadoopNamenodePort: Int = 8020
            val SKILURL: String = "http://localhost:9008"
            val JOBS_WORK_DIR: String = "/tmp/skil-jobs"
    
            val resourceClient: SKILResourceClient = new SKILResourceClient(SKILURL)
            resourceClient.setAuthToken(JWTUtil.generateSystemToken())
            val jobClient: JobClient = new JobClient(SKILURL)
            jobClient.setAuthToken(JWTUtil.generateSystemToken())
    
            // cleanup all earlier jobs and resources
            val jobs: List[JobModel] = jobClient.getAllJobs
            for (job <- jobs) {
              jobClient.deleteJob(job.getJobId)
            }
            val resources: List[Resource] = resourceClient.getResources
            println(resources)
            for (resource <- resources) {
              resourceClient.deleteResource(resource.getResourceId)
            }
    
            // Create the compute and storage resources
            val sparkResourceDetails = new SparkResourceDetails(localSparkHome, sparkMasterUrl);
            val hdfsResourceDetails = new HDFSResourceDetails(hadoopNamenodeHost, String.valueOf(hadoopNamenodePort));
    
            // add the resources to the database
            val computeResource = resourceClient.addResource("SKIL YARN", sparkResourceDetails, "");
            val storageResource = resourceClient.addResource("SKIL HDFS", hdfsResourceDetails, "");
    
            val jobsFolder = new File(JOBS_WORK_DIR);
            if (!jobsFolder.exists()) jobsFolder.mkdirs();
            val outputPath = new File(jobsFolder.getAbsolutePath(), "output.zip").getAbsolutePath();
    
            // create a new SKIL training job using these resources
            val jobArgs: Array[String] = Array(
                    "--skil.spark.master", "local[*]",
                    "--skil.spark.deploy-mode", "client",
                    "--skil.spark.conf", "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=6G",
                    "--skil.spark.executor-memory", "1g",
                    "--skil.spark.total-executor-cores", String.valueOf(1),
                    "--skil.spark.conf", "spark.hadoop.fs.defaultFS=hdfs://" + hadoopNamenodeHost + ":" + hadoopNamenodePort,
                    "-mo", getModelFile(jobsFolder).getAbsolutePath(),
                    "-tm", getTrainingMasterConfFile(hadoopNamenodePort, jobsFolder).getAbsolutePath(),
                    "-kdsp",classOf[MnistKeyedProvider].getName,
                    "--evalType", "evaluation",
                    "--numEpochs", "1",
                    "--outputPath", outputPath
            );
    
            println("jobArgs: " + jobArgs.mkString(" "));
            val testTrainingJob = jobClient.createJob(JobType.TRAINING, computeResource.getResourceId(), storageResource.getResourceId(), jobArgs.mkString(" "));
            println("job created:" + testTrainingJob.toString())
            // run the job
    
            Unirest.setTimeouts(0, 0);
            var jobEntity = jobClient.runJob(testTrainingJob.getJobId());
            Unirest.setTimeouts(10000, 60000);
            // check the status immediately after submission
            println("jobEntity started running:" + jobEntity.toString());
    
            // check the status after 90 seconds
            Thread.sleep(90000);
            jobEntity = jobClient.getRunStatus(jobEntity.getRunId());
            println("jobEntity after 90 seconds:" + jobEntity.toString());
    
            // cleanup
            jobClient.deleteJob(testTrainingJob.getJobId());
            resourceClient.deleteResource(computeResource.getResourceId());
            resourceClient.deleteResource(storageResource.getResourceId());
        }
    
    main()
    
    image.gif

    相关文章

      网友评论

          本文标题:SKIL/工作流程/执行作业

          本文链接:https://www.haomeiwen.com/subject/ugscgqtx.html