美文网首页
SKIL/工作流程/分布式ETL

SKIL/工作流程/分布式ETL

作者: hello风一样的男子 | 来源:发表于2019-04-19 17:14 被阅读0次

    分布式ETL

    ETL代表提取、转换和加载。它是机器学习问题中数据准备和预处理的一个常见工作流程。ETL是从数据源中提取或拉取数据,将其转换为可用形式,然后将其加载到模型/数据库中进行训练/分析。

    SKIL中的分布式ETL是指在spark集群上以分布式模式对提取的数据进行转换。

    使用Spark集群
    要使分布式ETL工作,你需要在后端有一个Spark集群,并且需要一个客户机,一个包含“SparkContext”中有关作业调优的所有元数据的驱动程序。以下是Spark集群中不同部件之间的连接方式。

    Spark Cluster Components (Image taken from: https://spark.apache.org/docs/latest/cluster-overview.html) image.gif ​

    Spark集群组件(图片来源:https://spark.apache.org/docs/latest/cluster-overview.html

    这些组件可以列为:
    Spark worker:实际执行任务的集群节点。
    Spark master:工作节点的资源管理器。可以是独立的,Apache Meos或Hadoop YARN。
    Spark driver:客户机应用程序,从Spark master请求资源并在工作节点上执行任务。

    Zeppelin提供了自己的SparkContext,每次在Spark解释器内进行一次性配置后,你都可以方便地使用它。

    什么是SparkContext?
    SparkContext是Spark功能的主要入口点。SparkContext表示与spark集群的连接,可用于在该集群上创建RDD、累加器和广播变量。
    每个JVM只能有一个SparkContext处于活动状态。
    使用Zeppelin的SparkContext
    你可以在Spark解释器设置中配置Zeppelin的SparkContext。下图显示了配置的外观。

    Zeppelin SparkContext settings image.gif ​

    Zeppelin SparkContext 设置

    你必须配置以下参数::

    master: spark 集群的URL(spark、yarn或meos资源管理器)
    spark.executor.memory: 供执行者使用的内存(例如“1g”或“1024m”等)
    args: Spark应用程序的其他参数。
    spark.cores.max: 要使用的最大核心数。

    你也可以与其他资源管理器一起尝试此设置。此页描述使用不同的资源管理器,通过一组Docker镜象详细描述了它。
    例子
    我们可以通过变量sc访问Zeppelin的“SparkContext”。已经设置的SparkContext将包含所有集群信息和调整参数。你可以以类似于在转换数据时如何使用本地配置的SparkContext的方式使用它。

    /* 执行CSV转换 */
    import org.apache.spark.api.java.JavaRDD
    import org.apache.spark.api.java.JavaSparkContext
    import org.datavec.api.writable.Writable
    import org.datavec.api.records.reader.impl.csv.CSVRecordReader
    import org.datavec.spark.transform.misc.StringToWritablesFunction
    // 从 zeppelin 的SparkContext (sc)中创建JavaSparkContext
    val jsc = JavaSparkContext.fromSparkContext(sc) 
    //把数据读取为JavaRDD[String]
    val stringData = jsc.textFile(filename) 
    
    val rr = new CSVRecordReader()
    val parsedInputData = stringData.filter((line: String) => !
    //把Strings转换为Writables列表
    (line.isEmpty())).toJavaRDD().map(new StringToWritablesFunction(rr)); 
    //执行转换过程
    val processedData = SparkTransformExecutor.execute(parsedInputData, tp) 
    
    image.gif

    这将在配置的Spark集群上提供的数据文件路径(文件名)上执行给定的转换过程(TP)。

    相关文章

      网友评论

          本文标题:SKIL/工作流程/分布式ETL

          本文链接:https://www.haomeiwen.com/subject/rnjcgqtx.html