美文网首页
SKIL/工作流程/批量推理

SKIL/工作流程/批量推理

作者: hello风一样的男子 | 来源:发表于2019-04-19 17:12 被阅读0次

    批量推理

    SKIL通过其SkilContext为Hadoop/Spark集群中的数据运行本地推理提供了批量推理功能。它通过传递大量的网络调用和与之相关的延迟来帮助解决问题,特别是当有大量数据需要处理时。

    SkilContext batchInference将保存的SKIL模型(通过SkilContext addModelToExperiment)复制到内置HTTP服务器中,并启动Spark作业,在该作业中,执行者将下载模型并对其运行推理,并将输出保存在指定路径中。推理以(id,output)格式保存。

    必须提供实现KeyedDataSetProvider接口的完全符合的类名,该接口采用SparkContextand returns a JavaPairRDD<K, INDArray>。界面如下:

    public interface KeyedDataSetProvider {
        <K> JavaPairRDD<K, INDArray> data(SparkContext sparkContext);
    }
    
    image.gif

    SKIL已经提供了MNIST数据提供程序。接口实现为:

    public class MnistKeyedProvider implements KeyedDataSetProvider {
        private final AtomicInteger count = new AtomicInteger(0);
    
        @Override
        public JavaPairRDD<Integer, INDArray> data(SparkContext sparkContext) {
            try {
                MnistDataSetIterator mnistDataSetIterator = new MnistDataSetIterator(16, 60000);
                List<Tuple2<Integer, INDArray>> data = new ArrayList<>();
    
                while (mnistDataSetIterator.hasNext()) {
                    DataSet ds = mnistDataSetIterator.next();
                    Integer key = new Integer(count.getAndIncrement());
                    data.add(new Tuple2<>(key, ds.getFeatures()));
                }
    
                JavaSparkContext jsc = new JavaSparkContext(sparkContext);
                JavaPairRDD<Integer, INDArray> rdd = jsc.parallelizePairs(data);
                return rdd;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    image.gif

    样例使用
    如果你已经保存了一个模型,那么你可以使用它的ID,并对已经通过KeyedDataSetProvider提供的数据运行批处理推断。我们将在这里使用已经提供的MnistKeyedProvider。在实验中,在你的SKIL Zeppelin笔记本上执行以下代码:

    //将此替换为已保存的模型ID
    val modelId = "<saved_model_id>"
    val keyedDataSetProviderClass = "io.skymind.skil.train.spark.MnistKeyedProvider"
    val outputPath = "hdfs:/tmp/out1"
    val batchSize = 16
    
    val skilContext = new SkilContext()
    
    import io.skymind.skil.daemon.model.SparkArgs
    //所有Spark提交选项都可用。
    val args = SparkArgs.builder().master("yarn").doInference(true).build() 
    val jobExecutionID = skilContext.batchInference(z, args, modelId, keyedDataSetProviderClass, outputPath, batchSize)
    
    image.gif

    你将能够在SKIL的进程选项卡中看到执行进程。
    要将结果加载回笔记本,请在作业成功完成后执行以下代码。

    import org.apache.spark.api.java.JavaPairRDD
    import org.nd4j.linalg.api.ndarray.INDArray
    val result = sc.objectFile("hdfs://tmp/out1").asInstanceOf[JavaPairRDD[Integer, INDArray]]
    
    image.gif

    使用SparkArgs
    如果要为自己的Spark设置自定义批处理推理,可以构建自己的SparkArgs类对象并将其传递给SkilContext#batchInference。下表列出了SparkArgs的参数、说明和默认值:

    | 变量 | 描述 | 默认值 |
    |

    master

    |

    参数服务器要连接到的master URL。spark://host:port, mesos://host:port, yarn, 或 local

    | "local[*]" |
    | deployMode |

    是否在本地启动驱动程序(“client”)或
    在集群中的一台工作机上(“cluster”)。

    | "client" |
    | mainClass | 应用程序的主类(对于Java/Scala应用程序) | "io.skymind.skil.train.spark.SKILSparkMain" |
    | jars | 驱动程序上要包含的本地jar的逗号分隔列表
    和执行器类路径 |
    |
    | name | 应用名称 |
    |
    | packages |

    驱动程序上要包含的本地jar的maven坐标逗号分隔列表和执行器类路径。

    将搜索本地maven仓库,然后中央仓库和任何由repositories提供的额外的远程仓库

    坐标格式为:groupId:artifactid:version

    |
    |
    | excludePackages |

    groupid:artifactid的逗号分隔列表,在解析--packages中提供的依赖项以避免依赖项冲突时排除

    |
    |
    | propertiesFile |

    要从中加载额外属性的文件的路径。如果未指定,将查找conf/spark-defaults.conf

    |
    |
    | repositories | 以逗号分隔的其他远程存储库列表,用于搜索随包提供的maven坐标 |
    |
    | files | 将放在每个执行器工作目录中的文件的逗号分隔列表 |
    |
    | driverMemory | 驱动器内存(例如1000M、2G)(默认值:1024M) |
    |
    | driverJavaOptions | 传递给驱动程序的额外Java选项 |
    |
    | driverLibraryPath | 要传递给驱动程序的额外库路径条目 |
    |
    | driverClassPath | 要传递给驱动程序的额外类路径条目。注意,随--jar添的jar会自动包含在类路径中。 |
    |
    | executorMemory | 每个执行器的内存(例如1000M、2G)(默认值:1G) |
    |
    | proxyUser | 提交应用程序时要模拟的用户 |
    |
    | driverCores | 用于驱动程序核心 | 1 |
    | yarnQueue | 用于提交到的YARN队列 | default |
    | numExecutors | 要启动的执行器数 | 2 |
    | principal | 在安全hdfs上运行时用于登录kdc的主体 |
    |
    | keyTab | 包含上述主体的keytab的文件的完整路径。此keytab将通过安全的分布式缓存复制到运行应用程序主服务器的节点,用于定期更新登录票证和委派令牌。 |
    |
    | supervise | 如果给定,则在出现故障时重新启动驱动程序 |
    |
    | kill | 如果给定,则杀死指定的驱动程序 |
    |
    | status | 如果给定,则请求指定的驱动程序的状态 |
    |
    | totalExecutorCores | 所有执行者的核心总数 | 1 |
    | trainingMasterPath | TrainingMaster的路径 |
    |
    | modelPath | 模型路径 |
    |
    | uiUrl | 界面Url |
    |
    | dataSetProvider | 数据集提供者 |
    |
    | jarPath | jar文件路径 |
    |
    | pArgs | spark作业程序参数 | new ArrayList<String>() |
    | modelHistoryUrl | 模型历史Url | null |
    | modelHistoryId | 模型历史Id | null |
    | evalType | 评估类型,可能的值有:evaluation, evaluationbinary, roc, rocbinary, rocmulticlass, regressionevaluation | null |
    | numEpochs | 训练的轮数 | 5 |
    | evalDataSetProviderClass | 评估数据集提供者 | MnistProvider |
    | multiDataSet | 是否为多数据集 | false |
    | modelInstanceId | 模型实例ID | null |
    | doInference | 用指定的模型URI进行推理 | false |
    | outputPath | 进行推理时,保存结果的路径 | null |
    | batchSize | 用于推理的批量大小 | 16 |
    | verbose | 打印调试信息 | false |
    |
    |
    |
    |
    |
    |
    |
    |

    使用你自己的数据
    如果要在自己的数据上运行它,则需要:

    1. 创建一个maven项目。
    2. 添加对nd4j、datavec和deeplearning4j的依赖项。
    3. 创建实现KeyedDataSetProvider接口的唯一类。
    4. 运行mvn package 生成JAR文件。
    5. 使用插件(plugin)选项卡将生成JAR文件上传到SKIL。
    6. 运行一个新的 SkilContext#batchInference 作业。

    依赖关系
    根据项目的不同,需要在Maven项目中添加以下部分或全部依赖项:

    <!-- For datavec --> 
    <dependency>
      <groupId>org.datavec</groupId>
      <artifactId>datavec-api</artifactId>
      <version>${datavec.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.datavec</groupId>
      <artifactId>datavec-spark_${scala.binary.version}</artifactId>
      <version>${datavec.spark.version}</version>
    </dependency>
    
    <!-- For nd4j native --> 
    <dependency>
      <groupId>org.nd4j</groupId>
      <artifactId>nd4j-native-platform</artifactId>
      <version>${nd4j.version}</version>
    </dependency>
    
    <!-- For nd4j cuda 7.5 --> 
    <dependency>
      <groupId>org.nd4j</groupId>
      <artifactId>nd4j-cuda-7.5-platform</artifactId>
      <version>${nd4j.version}</version>
    </dependency>
    
    <!-- For nd4j cuda 8 --> 
    <dependency>
      <groupId>org.nd4j</groupId>
      <artifactId>nd4j-cuda-8.0-platform</artifactId>
      <version>${nd4j.version}</version>
    </dependency>
    
    <!-- Core DL4J functionality -->
    <dependency>
      <groupId>org.deeplearning4j</groupId>
      <artifactId>deeplearning4j-core</artifactId>
      <version>${dl4j.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.deeplearning4j</groupId>
      <artifactId>deeplearning4j-nlp</artifactId>
      <version>${dl4j.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.deeplearning4j</groupId>
      <artifactId>deeplearning4j-zoo</artifactId>
      <version>${dl4j.version}</version>
    </dependency>
    
    image.gif

    附加说明和注意事项
    当你运行一个SkilContext#batchInference作业时,它会从头创建一个“uber jar”,将所有skil jar捆绑在一起,每次你更新或添加一个新插件时,它都会重新生成这个uber jar。创建这个jar需要很长时间。
    推理有两种方法:

    1. 使用模型服务器
    2. 使用批量推理

    当你的数据不太大时,模型服务器是有意义的。或者spark流式的东西。我们的模型服务器中有一个批量分类API,你可以使用它一次运行许多预测。理想的数量是32或64,但这取决于模型的大小。缺点是存在与此相关的网络流量,它可能是大量的网络流量,但只要你可以从Spark作业到SKIL机器或集群执行一个REST调用,就很容易设置。

    使用SkilContext#batchInference你需要在启动skil时正确配置SPARK_HOME。spark集群中的执行器节点需要能够通过主机名(共享DNS服务器)访问skil服务器的IP地址。此外,SPARK_HOME还需要具有正确的Hadoop配置。但是,如果你清除了所有这些障碍,每个执行器将在本地运行推理。因此,除了最初的模型下载,它不会对你的网络添加负担,而且速度最快,因为它是CPU绑定的,而不是I/O绑定的。

    下两个版本的SKIL将使这个过程更加简化。我们还希望增加对多个SPARK_HOME的支持,并根据需要启动云Hadoop集群,如EMRHDinsight

    相关文章

      网友评论

          本文标题:SKIL/工作流程/批量推理

          本文链接:https://www.haomeiwen.com/subject/hbjcgqtx.html