执行作业
作业是在后台对连接的SKIL资源运行的计算。作业可以有两种类型:
- 训练
- 推理
运行作业
完成将外部资源连接到skil之后,可以通过以下两种方式对其执行训练/推理作业。
A. CLI
B. REST 端
skil jobs
命令管理通过SKIL CLI执行作业的工作流程。等效的REST端点也可用。
作业工作流程如下:
- 创建一个作业。
- 提交在步骤1中创建的作业以运行。作业在后台开始运行。
- 定期检查正在运行的作业的状态。
- 从已完成的作业下载输出文件。
- 从作业列表中删除已完成的作业(可选)。
可以使用SKIL同时创建和运行多个作业。它们的执行顺序将取决于底层计算资源。
1. 创建作业
对于创建作业,skil提供以下内容:
cli:skil jobs create<args>。
REST端点:post -/jobs/<type>。其中<type>指的是作业类型,可以是训练或推理。
两种变体的格式如下:
cli
skil jobs create --type <training/inference> --storageResourceId <storage_resource_id> --computeResourceId <compute_resource_id> --jobArgs <job_arguments> --outputFileName <output_file_name>
image.gif
REST - cURL
curl -d '{"computeResourceId": <compute_resource_id>, "storageResourceId": <storage_resource_id>, "jobArgs": "<skil_spark_main_args>", "outputFileName": "<output_file_name>"}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<type>
# <type> => one of ["training", "inference"]
image.gif
参数如下:
| 参数 | 详情 | 类型 |
| computeResourceId | 计算资源的ID。 | Long |
| storageResourceId | 存储资源的ID。 | Long |
| jobArgs |
作业参数
示例:
<pre>"jobArgs":"-mo /var/skil/neuralnet.zip -tm /var/skil/parameteraveraging.json -dsp io.skymind.skil.train.spark.MnistProvider --evalType evaluation --numEpochs 1 --outputPath /tmp/output5049941383652659983.zip"</pre>
| String |
| outputFileName | 从作业生成的输出文件应该具有的名称。 | String |
| type | 作业类型["training", "inference"]之一 | String |
2.运行工作
要运行作业,请使用以下任一项:
cli:skil jobs run--id<jobid>。
REST端点:post-/jobs/<id>/run。创建作业时会获取作业ID。
它们的格式是:
CLI
skil jobs run --id <jobId>
image.gif
REST 端
curl -d '{}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/run
image.gif
参数如下
| 参数 | 详情 | 类型 |
| id | 作业ID,创建作业后获得 | Long |
3.获取作业状态
注意
由YARM资源支持的作业目前不支持状态检查。
要接收作业状态,请使用以下任一项:
cli:skil jobs status --id<jobid>。
REST端点:get-/jobs/<id>/refresh。创建作业时会获得作业ID。
它们的格式是:
skil jobs status --id <jobId>
image.gif
REST 端
curl -d '{}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/refresh
image.gif
参数如下
| 参数 | 详情 | 类型 |
| id | 作业ID,创建作业后获得 | Long |
4.从作业中获取输出
目前,只有通过REST端点才支持下载作业的输出。
注意
只有当作业的运行状态为“完成”时,此操作才有效。
相关端点及其格式如下:
REST端点:post -/jobs/<id>/outputfile。创建作业时会获取作业ID。
REST 端
curl -d '{"localDownloadPath": "<local_download_path>"}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/outputfile
image.gif
参数如下
| 参数 | 详情 | 类型 |
| localDownloadPath | 要下载输出的本地文件路径。 | String |
| id | 作业ID,创建作业后获得 | Long |
5.删除完成的作业(可选)
要删除作业,请使用以下任一项:
cli:skil jobs rm --id<jobid>。
REST端点:DELETE -http://localhost:9008/jobs/<id>。创建作业时会获取作业ID。
它们的格式是:
CLI
skil jobs rm --id <jobId>
image.gif
REST 端
curl -H "Authorization: Bearer <auth_token>" -X DELETE http://localhost:9008/jobs/<id>
image.gif
参数如下
| 参数 | 详情 | 类型 |
| id | 作业ID,创建作业后获得 | Long |
创建并运行作业同样可以在笔记本中执行如下示例代码来完成
import io.skymind.auth.JWTUtil;
import io.skymind.jobs.client.JobClient;
import io.skymind.jobs.model.JobModel;
import io.skymind.jobs.model.JobRun;
import io.skymind.jobs.model.JobType;
import io.skymind.resource.client.SKILResourceClient;
import io.skymind.resource.model.Resource;
import io.skymind.resource.model.subtypes.ResourceDetails;
import io.skymind.resource.model.subtypes.compute.SparkResourceDetails;
import io.skymind.resource.model.subtypes.storage.HDFSResourceDetails;
import io.skymind.skil.train.spark.MnistKeyedProvider;
import org.apache.spark.storage.StorageLevel;
import org.datavec.api.transform.serde.JsonMappers;
import org.deeplearning4j.nn.api.OptimizationAlgorithm;
import org.deeplearning4j.nn.conf.MultiLayerConfiguration;
import org.deeplearning4j.nn.conf.NeuralNetConfiguration;
import org.deeplearning4j.nn.conf.inputs.InputType;
import org.deeplearning4j.nn.conf.layers.ConvolutionLayer;
import org.deeplearning4j.nn.conf.layers.DenseLayer;
import org.deeplearning4j.nn.conf.layers.OutputLayer;
import org.deeplearning4j.nn.conf.layers.SubsamplingLayer;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
import org.deeplearning4j.nn.weights.WeightInit;
import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster;
import org.deeplearning4j.util.ModelSerializer;
import org.nd4j.linalg.activations.Activation;
import org.nd4j.linalg.learning.config.Nesterovs;
import org.nd4j.linalg.lossfunctions.LossFunctions;
import com.mashape.unirest.http.Unirest;
import scala.collection.JavaConversions._
import java.io.File;
import java.io.IOException;
import java.util.List;
def getTrainingMasterConfFile(nameNodePort: Int, folder: File): File = {
val parameterAveragingTrainingMaster: ParameterAveragingTrainingMaster =
new ParameterAveragingTrainingMaster.Builder(100)
.batchSizePerWorker(1000)
.averagingFrequency(3)
.exportDirectory("hdfs://localhost:" + nameNodePort + new File(
folder,
"exportdir").getAbsolutePath)
.workerPrefetchNumBatches(10)
.storageLevel(StorageLevel.DISK_ONLY)
.build()
val jsonWriteParamAveraging: File =
new File(folder, "parameteraveraging.json")
JsonMappers.getMapper
.writeValue(jsonWriteParamAveraging, parameterAveragingTrainingMaster)
jsonWriteParamAveraging
}
def getModelFile(folder: File): File = {
val builder
: MultiLayerConfiguration.Builder = // Training iterations as above
new NeuralNetConfiguration.Builder()
.seed(230)
.l2(0.0005)
.weightInit(WeightInit.XAVIER)
.optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
.updater(Nesterovs.builder().learningRate(0.01).momentum(0.9).build())
.list()
.layer(0,
new ConvolutionLayer.Builder(5, 5)
.nIn(1)
.stride(1, 1)
.nOut(20)
.activation(Activation.IDENTITY)
.build())
.layer(1,
new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
.kernelSize(2, 2)
.stride(2, 2)
.build())
.layer(2,
new ConvolutionLayer.Builder(5, 5)
.stride(1, 1)
.nOut(50)
.activation(Activation.IDENTITY)
.build())
.layer(3,
new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
.kernelSize(2, 2)
.stride(2, 2)
.build())
.layer(4,
new DenseLayer.Builder()
.activation(Activation.RELU)
.nOut(500)
.build())
.layer(5,
new OutputLayer.Builder(
LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
.nOut(10)
.activation(Activation.SOFTMAX)
.build())
.setInputType( //See note below
InputType.convolutionalFlat(28, 28, 1))
val network: MultiLayerNetwork = new MultiLayerNetwork(builder.build())
network.init()
val neuralNet: File = new File(folder, "neuralnet.zip")
ModelSerializer.writeModel(network, neuralNet, true)
neuralNet
}
def main() {
val localSparkHome: String = "/opt/spark"
val sparkMasterUrl: String = "spark://localhost:7077"
val hadoopNamenodeHost: String = "localhost"
val hadoopNamenodePort: Int = 8020
val SKILURL: String = "http://localhost:9008"
val JOBS_WORK_DIR: String = "/tmp/skil-jobs"
val resourceClient: SKILResourceClient = new SKILResourceClient(SKILURL)
resourceClient.setAuthToken(JWTUtil.generateSystemToken())
val jobClient: JobClient = new JobClient(SKILURL)
jobClient.setAuthToken(JWTUtil.generateSystemToken())
// cleanup all earlier jobs and resources
val jobs: List[JobModel] = jobClient.getAllJobs
for (job <- jobs) {
jobClient.deleteJob(job.getJobId)
}
val resources: List[Resource] = resourceClient.getResources
println(resources)
for (resource <- resources) {
resourceClient.deleteResource(resource.getResourceId)
}
// Create the compute and storage resources
val sparkResourceDetails = new SparkResourceDetails(localSparkHome, sparkMasterUrl);
val hdfsResourceDetails = new HDFSResourceDetails(hadoopNamenodeHost, String.valueOf(hadoopNamenodePort));
// add the resources to the database
val computeResource = resourceClient.addResource("SKIL YARN", sparkResourceDetails, "");
val storageResource = resourceClient.addResource("SKIL HDFS", hdfsResourceDetails, "");
val jobsFolder = new File(JOBS_WORK_DIR);
if (!jobsFolder.exists()) jobsFolder.mkdirs();
val outputPath = new File(jobsFolder.getAbsolutePath(), "output.zip").getAbsolutePath();
// create a new SKIL training job using these resources
val jobArgs: Array[String] = Array(
"--skil.spark.master", "local[*]",
"--skil.spark.deploy-mode", "client",
"--skil.spark.conf", "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=6G",
"--skil.spark.executor-memory", "1g",
"--skil.spark.total-executor-cores", String.valueOf(1),
"--skil.spark.conf", "spark.hadoop.fs.defaultFS=hdfs://" + hadoopNamenodeHost + ":" + hadoopNamenodePort,
"-mo", getModelFile(jobsFolder).getAbsolutePath(),
"-tm", getTrainingMasterConfFile(hadoopNamenodePort, jobsFolder).getAbsolutePath(),
"-kdsp",classOf[MnistKeyedProvider].getName,
"--evalType", "evaluation",
"--numEpochs", "1",
"--outputPath", outputPath
);
println("jobArgs: " + jobArgs.mkString(" "));
val testTrainingJob = jobClient.createJob(JobType.TRAINING, computeResource.getResourceId(), storageResource.getResourceId(), jobArgs.mkString(" "));
println("job created:" + testTrainingJob.toString())
// run the job
Unirest.setTimeouts(0, 0);
var jobEntity = jobClient.runJob(testTrainingJob.getJobId());
Unirest.setTimeouts(10000, 60000);
// check the status immediately after submission
println("jobEntity started running:" + jobEntity.toString());
// check the status after 90 seconds
Thread.sleep(90000);
jobEntity = jobClient.getRunStatus(jobEntity.getRunId());
println("jobEntity after 90 seconds:" + jobEntity.toString());
// cleanup
jobClient.deleteJob(testTrainingJob.getJobId());
resourceClient.deleteResource(computeResource.getResourceId());
resourceClient.deleteResource(storageResource.getResourceId());
}
main()
image.gif
网友评论