分布式ETL
ETL代表提取、转换和加载。它是机器学习问题中数据准备和预处理的一个常见工作流程。ETL是从数据源中提取或拉取数据,将其转换为可用形式,然后将其加载到模型/数据库中进行训练/分析。
SKIL中的分布式ETL是指在spark集群上以分布式模式对提取的数据进行转换。
使用Spark集群
要使分布式ETL工作,你需要在后端有一个Spark集群,并且需要一个客户机,一个包含“SparkContext”中有关作业调优的所有元数据的驱动程序。以下是Spark集群中不同部件之间的连接方式。
Spark集群组件(图片来源:https://spark.apache.org/docs/latest/cluster-overview.html)
这些组件可以列为:
Spark worker:实际执行任务的集群节点。
Spark master:工作节点的资源管理器。可以是独立的,Apache Meos或Hadoop YARN。
Spark driver:客户机应用程序,从Spark master请求资源并在工作节点上执行任务。
Zeppelin提供了自己的SparkContext,每次在Spark解释器内进行一次性配置后,你都可以方便地使用它。
什么是SparkContext?
SparkContext是Spark功能的主要入口点。SparkContext表示与spark集群的连接,可用于在该集群上创建RDD、累加器和广播变量。
每个JVM只能有一个SparkContext处于活动状态。
使用Zeppelin的SparkContext
你可以在Spark解释器设置中配置Zeppelin的SparkContext。下图显示了配置的外观。
Zeppelin SparkContext 设置
你必须配置以下参数::
master: spark 集群的URL(spark、yarn或meos资源管理器)
spark.executor.memory: 供执行者使用的内存(例如“1g”或“1024m”等)
args: Spark应用程序的其他参数。
spark.cores.max: 要使用的最大核心数。
你也可以与其他资源管理器一起尝试此设置。此页描述使用不同的资源管理器,通过一组Docker镜象详细描述了它。
例子
我们可以通过变量sc访问Zeppelin的“SparkContext”。已经设置的SparkContext将包含所有集群信息和调整参数。你可以以类似于在转换数据时如何使用本地配置的SparkContext的方式使用它。
/* 执行CSV转换 */
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
import org.datavec.api.writable.Writable
import org.datavec.api.records.reader.impl.csv.CSVRecordReader
import org.datavec.spark.transform.misc.StringToWritablesFunction
// 从 zeppelin 的SparkContext (sc)中创建JavaSparkContext
val jsc = JavaSparkContext.fromSparkContext(sc)
//把数据读取为JavaRDD[String]
val stringData = jsc.textFile(filename)
val rr = new CSVRecordReader()
val parsedInputData = stringData.filter((line: String) => !
//把Strings转换为Writables列表
(line.isEmpty())).toJavaRDD().map(new StringToWritablesFunction(rr));
//执行转换过程
val processedData = SparkTransformExecutor.execute(parsedInputData, tp)
image.gif
这将在配置的Spark集群上提供的数据文件路径(文件名)上执行给定的转换过程(TP)。
网友评论