批量推理
SKIL通过其SkilContext为Hadoop/Spark集群中的数据运行本地推理提供了批量推理功能。它通过传递大量的网络调用和与之相关的延迟来帮助解决问题,特别是当有大量数据需要处理时。
SkilContext batchInference
将保存的SKIL模型(通过SkilContext addModelToExperiment)复制到内置HTTP服务器中,并启动Spark作业,在该作业中,执行者将下载模型并对其运行推理,并将输出保存在指定路径中。推理以(id,output)格式保存。
必须提供实现KeyedDataSetProvider
接口的完全符合的类名,该接口采用SparkContext
and returns a JavaPairRDD<K, INDArray>
。界面如下:
public interface KeyedDataSetProvider {
<K> JavaPairRDD<K, INDArray> data(SparkContext sparkContext);
}
image.gif
SKIL已经提供了MNIST数据提供程序。接口实现为:
public class MnistKeyedProvider implements KeyedDataSetProvider {
private final AtomicInteger count = new AtomicInteger(0);
@Override
public JavaPairRDD<Integer, INDArray> data(SparkContext sparkContext) {
try {
MnistDataSetIterator mnistDataSetIterator = new MnistDataSetIterator(16, 60000);
List<Tuple2<Integer, INDArray>> data = new ArrayList<>();
while (mnistDataSetIterator.hasNext()) {
DataSet ds = mnistDataSetIterator.next();
Integer key = new Integer(count.getAndIncrement());
data.add(new Tuple2<>(key, ds.getFeatures()));
}
JavaSparkContext jsc = new JavaSparkContext(sparkContext);
JavaPairRDD<Integer, INDArray> rdd = jsc.parallelizePairs(data);
return rdd;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
image.gif
样例使用
如果你已经保存了一个模型,那么你可以使用它的ID,并对已经通过KeyedDataSetProvider
提供的数据运行批处理推断。我们将在这里使用已经提供的MnistKeyedProvider
。在实验中,在你的SKIL Zeppelin笔记本上执行以下代码:
//将此替换为已保存的模型ID
val modelId = "<saved_model_id>"
val keyedDataSetProviderClass = "io.skymind.skil.train.spark.MnistKeyedProvider"
val outputPath = "hdfs:/tmp/out1"
val batchSize = 16
val skilContext = new SkilContext()
import io.skymind.skil.daemon.model.SparkArgs
//所有Spark提交选项都可用。
val args = SparkArgs.builder().master("yarn").doInference(true).build()
val jobExecutionID = skilContext.batchInference(z, args, modelId, keyedDataSetProviderClass, outputPath, batchSize)
image.gif
你将能够在SKIL的进程选项卡中看到执行进程。
要将结果加载回笔记本,请在作业成功完成后执行以下代码。
import org.apache.spark.api.java.JavaPairRDD
import org.nd4j.linalg.api.ndarray.INDArray
val result = sc.objectFile("hdfs://tmp/out1").asInstanceOf[JavaPairRDD[Integer, INDArray]]
image.gif
使用SparkArgs
如果要为自己的Spark设置自定义批处理推理,可以构建自己的SparkArgs类对象并将其传递给SkilContext#batchInference
。下表列出了SparkArgs
的参数、说明和默认值:
| 变量 | 描述 | 默认值 |
|
master
|
参数服务器要连接到的master URL。spark://host:port, mesos://host:port, yarn, 或 local
| "local[*]" |
| deployMode |
是否在本地启动驱动程序(“client”)或
在集群中的一台工作机上(“cluster”)。
| "client" |
| mainClass | 应用程序的主类(对于Java/Scala应用程序) | "io.skymind.skil.train.spark.SKILSparkMain" |
| jars | 驱动程序上要包含的本地jar的逗号分隔列表
和执行器类路径 |
|
| name | 应用名称 |
|
| packages |
驱动程序上要包含的本地jar的maven坐标逗号分隔列表和执行器类路径。
将搜索本地maven仓库,然后中央仓库和任何由repositories提供的额外的远程仓库
坐标格式为:groupId:artifactid:version
|
|
| excludePackages |
groupid:artifactid的逗号分隔列表,在解析--packages中提供的依赖项以避免依赖项冲突时排除
|
|
| propertiesFile |
要从中加载额外属性的文件的路径。如果未指定,将查找conf/spark-defaults.conf
|
|
| repositories | 以逗号分隔的其他远程存储库列表,用于搜索随包提供的maven坐标 |
|
| files | 将放在每个执行器工作目录中的文件的逗号分隔列表 |
|
| driverMemory | 驱动器内存(例如1000M、2G)(默认值:1024M) |
|
| driverJavaOptions | 传递给驱动程序的额外Java选项 |
|
| driverLibraryPath | 要传递给驱动程序的额外库路径条目 |
|
| driverClassPath | 要传递给驱动程序的额外类路径条目。注意,随--jar添的jar会自动包含在类路径中。 |
|
| executorMemory | 每个执行器的内存(例如1000M、2G)(默认值:1G) |
|
| proxyUser | 提交应用程序时要模拟的用户 |
|
| driverCores | 用于驱动程序核心 | 1 |
| yarnQueue | 用于提交到的YARN队列 | default |
| numExecutors | 要启动的执行器数 | 2 |
| principal | 在安全hdfs上运行时用于登录kdc的主体 |
|
| keyTab | 包含上述主体的keytab的文件的完整路径。此keytab将通过安全的分布式缓存复制到运行应用程序主服务器的节点,用于定期更新登录票证和委派令牌。 |
|
| supervise | 如果给定,则在出现故障时重新启动驱动程序 |
|
| kill | 如果给定,则杀死指定的驱动程序 |
|
| status | 如果给定,则请求指定的驱动程序的状态 |
|
| totalExecutorCores | 所有执行者的核心总数 | 1 |
| trainingMasterPath | TrainingMaster的路径 |
|
| modelPath | 模型路径 |
|
| uiUrl | 界面Url |
|
| dataSetProvider | 数据集提供者 |
|
| jarPath | jar文件路径 |
|
| pArgs | spark作业程序参数 | new ArrayList<String>() |
| modelHistoryUrl | 模型历史Url | null |
| modelHistoryId | 模型历史Id | null |
| evalType | 评估类型,可能的值有:evaluation, evaluationbinary, roc, rocbinary, rocmulticlass, regressionevaluation | null |
| numEpochs | 训练的轮数 | 5 |
| evalDataSetProviderClass | 评估数据集提供者 | MnistProvider |
| multiDataSet | 是否为多数据集 | false |
| modelInstanceId | 模型实例ID | null |
| doInference | 用指定的模型URI进行推理 | false |
| outputPath | 进行推理时,保存结果的路径 | null |
| batchSize | 用于推理的批量大小 | 16 |
| verbose | 打印调试信息 | false |
|
|
|
|
|
|
|
|
使用你自己的数据
如果要在自己的数据上运行它,则需要:
- 创建一个maven项目。
- 添加对nd4j、datavec和deeplearning4j的依赖项。
- 创建实现
KeyedDataSetProvider
接口的唯一类。 - 运行mvn package 生成JAR文件。
- 使用插件(plugin)选项卡将生成JAR文件上传到SKIL。
- 运行一个新的
SkilContext#batchInference
作业。
依赖关系
根据项目的不同,需要在Maven项目中添加以下部分或全部依赖项:
<!-- For datavec -->
<dependency>
<groupId>org.datavec</groupId>
<artifactId>datavec-api</artifactId>
<version>${datavec.version}</version>
</dependency>
<dependency>
<groupId>org.datavec</groupId>
<artifactId>datavec-spark_${scala.binary.version}</artifactId>
<version>${datavec.spark.version}</version>
</dependency>
<!-- For nd4j native -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-native-platform</artifactId>
<version>${nd4j.version}</version>
</dependency>
<!-- For nd4j cuda 7.5 -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-cuda-7.5-platform</artifactId>
<version>${nd4j.version}</version>
</dependency>
<!-- For nd4j cuda 8 -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-cuda-8.0-platform</artifactId>
<version>${nd4j.version}</version>
</dependency>
<!-- Core DL4J functionality -->
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-core</artifactId>
<version>${dl4j.version}</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-nlp</artifactId>
<version>${dl4j.version}</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-zoo</artifactId>
<version>${dl4j.version}</version>
</dependency>
image.gif
附加说明和注意事项
当你运行一个SkilContext#batchInference
作业时,它会从头创建一个“uber jar”,将所有skil jar捆绑在一起,每次你更新或添加一个新插件时,它都会重新生成这个uber jar。创建这个jar需要很长时间。
推理有两种方法:
- 使用模型服务器
- 使用批量推理
当你的数据不太大时,模型服务器是有意义的。或者spark流式的东西。我们的模型服务器中有一个批量分类API,你可以使用它一次运行许多预测。理想的数量是32或64,但这取决于模型的大小。缺点是存在与此相关的网络流量,它可能是大量的网络流量,但只要你可以从Spark作业到SKIL机器或集群执行一个REST调用,就很容易设置。
使用SkilContext#batchInference你
需要在启动skil时正确配置SPARK_HOME
。spark集群中的执行器节点需要能够通过主机名(共享DNS服务器)访问skil服务器的IP地址。此外,SPARK_HOME
还需要具有正确的Hadoop配置。但是,如果你清除了所有这些障碍,每个执行器将在本地运行推理。因此,除了最初的模型下载,它不会对你的网络添加负担,而且速度最快,因为它是CPU绑定的,而不是I/O绑定的。
下两个版本的SKIL将使这个过程更加简化。我们还希望增加对多个SPARK_HOME
的支持,并根据需要启动云Hadoop集群,如EMR和HDinsight。
网友评论