从本地的 data frames 来创建 SparkDataFrames
从 Data Sources(数据源)创建 SparkDataFrame
从 Hive tables 来创建 SparkDataFrame
应用 User-Defined Function(UDF 用户自定义函数)
Run a given function on a large dataset usingdapplyordapplyCollect
SparkR 是一个 R package, 它提供了一个轻量级的前端以从 R 中使用 Apache Spark. 在 Spark 2.2.0 中, SparkR 提供了一个分布式的 data frame, 它实现了像 selection, filtering, aggregation etc 一系列所支持的操作.(dplyr与 R data frames 相似) ), 除了可用于海量数据上之外. SparkR 还支持使用 MLlib 来进行分布式的 machine learning(机器学习).
SparkDataFrame 是一个分布式的, 将数据映射到有名称的 colums(列)的集合. 在概念上 相当于关系数据库中的table表或 R 中的 data frame,但在该引擎下有更多的优化. SparkDataFrames 可以从各种来源构造,例如: 结构化的数据文件,Hive 中的表,外部数据库或现有的本地 R data frames.
All of the examples on this page use sample data included in R or the Spark distribution and can be run using the./bin/sparkRshell.
SparkR 的入口点是SparkSession, 它会连接您的 R 程序到 Spark 集群中. 您可以使用sparkR.session来创建SparkSession, 并传递诸如应用程序名称, 依赖的任何 spark 软件包等选项, 等等. 此外,还可以通过SparkSession来与SparkDataFrames一起工作。 如果您正在使用sparkRshell,那么SparkSession应该已经被创建了,你不需要再调用sparkR.session.
sparkR.session()
您可以从 RStudio 中来启动 SparkR. 您可以从 RStudio, R shell, Rscript 或者 R IDEs 中连接你的 R 程序到 Spark 集群中去. 要开始, 确保已经在环境变量中设置好 SPARK_HOME (您可以检测下Sys.getenv), 加载 SparkR package, 并且像下面一样调用sparkR.session. 它将检测 Spark 的安装, 并且, 如果没有发现, 它将自动的下载并且缓存起来. 当然,您也可以手动的运行install.spark.
为了调用sparkR.session, 您也可以指定某些 Spark driver 的属性. 通常哪些应用程序属性和运行时环境不能以编程的方式来设置, 这是因为 driver 的 JVM 进程早就已经启动了, 在这种情况下 SparkR 会帮你做好准备. 要设置它们, 可以像在sparkConfig参数中的其它属性一样传递它们到sparkR.session()中去.
if(nchar(Sys.getenv("SPARK_HOME"))<1){Sys.setenv(SPARK_HOME="/home/spark")}library(SparkR,lib.loc=c(file.path(Sys.getenv("SPARK_HOME"),"R","lib")))sparkR.session(master="local[*]",sparkConfig=list(spark.driver.memory="2g"))
下面的 Spark driver 属性可以 从 RStudio 的 sparkR.session 的 sparkConfig 中进行设置:
Property Name<(属性名称)Property group(属性分组)spark-submitequivalent
spark.masterApplication Properties--master
spark.yarn.keytabApplication Properties--keytab
spark.yarn.principalApplication Properties--principal
spark.driver.memoryApplication Properties--driver-memory
spark.driver.extraClassPathRuntime Environment--driver-class-path
spark.driver.extraJavaOptionsRuntime Environment--driver-java-options
spark.driver.extraLibraryPathRuntime Environment--driver-library-path
有了一个SparkSession之后, 可以从一个本地的 R data frame,Hive 表, 或者其它的data sources中来创建SparkDataFrame应用程序.
从本地的 data frames 来创建 SparkDataFrames
要创建一个 data frame 最简单的方式是去转换一个本地的 R data frame 成为一个 SparkDataFrame. 我们明确的使用as.DataFrame或createDataFrame并且经过本地的 R data frame 中以创建一个 SparkDataFrame. 例如, 下面的例子基于 R 中已有的faithful来创建一个SparkDataFrame.
df<-as.DataFrame(faithful)# 展示第一个 SparkDataFrame 的内容head(df)## eruptions waiting##1 3.600 79##2 1.800 54##3 3.333 74
从 Data Sources(数据源)创建 SparkDataFrame
SparkR 支持通过SparkDataFrame接口对各种 data sources(数据源)进行操作. 本节介绍使用数据源加载和保存数据的常见方法. 您可以查看 Spark Sql 编程指南的specific options部分以了解更多可用于内置的 data sources(数据源)内容.
从数据源创建 SparkDataFrames 常见的方法是read.df. 此方法将加载文件的路径和数据源的类型,并且将自动使用当前活动的 SparkSession. SparkR 天生就支持读取 JSON, CSV 和 Parquet 文件, 并且通过可靠来源的软件包第三方项目, 您可以找到 Avro 等流行文件格式的 data source connectors(数据源连接器). 可以用spark-submit或sparkR命令指定--packages来添加这些包, 或者在交互式 R shell 或从 RStudio 中使用sparkPackages参数初始化SparkSession.
sparkR.session(sparkPackages="com.databricks:spark-avro_2.11:3.0.0")
We can see how to use data sources using an example JSON input file. Note that the file that is used here isnota typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please seeJSON Lines text format, also called newline-delimited JSON. As a consequence, a regular multi-line JSON file will most often fail.
我们可以看看如何使用 JSON input file 的例子来使用数据源. 注意, 这里使用的文件是not一个经典的 JSON 文件. 文件中的每行都必须包含一个单独的,独立的有效的JSON对象
people<-read.df("./examples/src/main/resources/people.json","json")head(people)## age name##1 NA Michael##2 30 Andy##3 19 Justin# SparkR 自动从 JSON 文件推断出 schema(模式)printSchema(people)# root# |-- age: long (nullable = true)# |-- name: string (nullable = true)# 同样, 使用 read.json 读取多个文件people<-read.json(c("./examples/src/main/resources/people.json","./examples/src/main/resources/people2.json"))
该 data sources API 原生支持 CSV 格式的 input files(输入文件). 要了解更多信息请参阅 SparkRread.dfAPI 文档.
df<-read.df(csvPath,"csv",header="true",inferSchema="true",na.strings="NA")
该 data sources API 也可用于将 SparkDataFrames 存储为多个 file formats(文件格式). 例如, 我们可以使用write.df把先前的示例的 SparkDataFrame 存储为一个 Parquet 文件.
write.df(people,path="people.parquet",source="parquet",mode="overwrite")
从 Hive tables 来创建 SparkDataFrame
您也可以从 Hive tables(表)来创建 SparkDataFrames. 为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的 tables(表). 请注意, Spark 应该使用Hive support来构建,更多细节可以在SQL 编程指南中查阅.
sparkR.session()sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries can be expressed in HiveQL.results<-sql("FROM src SELECT key, value")# results is now a SparkDataFramehead(results)## key value## 1 238 val_238## 2 86 val_86## 3 311 val_311
SparkDataFrames 支持一些用于结构化数据处理的 functions(函数). 这里我们包括一些基本的例子,一个完整的列表可以在API文档中找到:
# Create the SparkDataFramedf<-as.DataFrame(faithful)# 获取关于 SparkDataFrame 基础信息df## SparkDataFrame[eruptions:double, waiting:double]# Select only the "eruptions" columnhead(select(df,df$eruptions))## eruptions##1 3.600##2 1.800##3 3.333# You can also pass in column name as stringshead(select(df,"eruptions"))# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 minshead(filter(df,df$waiting<50))## eruptions waiting##1 1.750 47##2 1.750 47##3 1.867 48
SparkR data frames 支持一些常见的, 用于在 grouping(分组)数据后进行 aggregate(聚合)的函数. 例如, 我们可以在faithfuldataset 中计算waiting时间的直方图, 如下所示.
# We use the `n` operator to count the number of times each waiting time appearshead(summarize(groupBy(df,df$waiting),count=n(df$waiting)))## waiting count##1 70 4##2 67 1##3 69 2# We can also sort the output from the aggregation to get the most common waiting timeswaiting_counts<-summarize(groupBy(df,df$waiting),count=n(df$waiting))head(arrange(waiting_counts,desc(waiting_counts$count)))## waiting count##1 78 15##2 83 14##3 81 13
SparkR 还提供了一些可以直接应用于列进行数据处理和 aggregatation(聚合)的函数. 下面的例子展示了使用基本的算术函数.
# Convert waiting time from hours to seconds.# Note that we can assign this to a new column in the same SparkDataFramedf$waiting_secs<-df$waiting*60head(df)## eruptions waiting waiting_secs##1 3.600 79 4740##2 1.800 54 3240##3 3.333 74 4440
应用 User-Defined Function(UDF 用户自定义函数)
在 SparkR 中, 我们支持几种 User-Defined Functions:
Run a given function on a large dataset usingdapplyordapplyCollect
应用一个 function(函数)到SparkDataFrame的每个 partition(分区). 应用于SparkDataFrame每个 partition(分区)的 function(函数)应该只有一个参数, 它中的data.frame对应传递的每个分区. 函数的输出应该是一个data.frame. Schema 指定生成的SparkDataFramerow format. 它必须匹配返回值的data types.
# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame.schema<-structType(structField("eruptions","double"),structField("waiting","double"),structField("waiting_secs","double"))df1<-dapply(df,function(x){x<-cbind(x,x$waiting*60)},schema)head(collect(df1))## eruptions waiting waiting_secs##1 3.600 79 4740##2 1.800 54 3240##3 3.333 74 4440##4 2.283 62 3720##5 4.533 85 5100##6 2.883 55 3300
像dapply那样, 应用一个函数到SparkDataFrame的每个分区并且手机返回结果. 函数的输出应该是一个data.frame. 但是, 不需要传递 Schema. 注意, 如果运行在所有分区上的函数的输出不能 pulled(拉)到 driver 的内存中过去, 则dapplyCollect会失败.
# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame and return a R's data.frameldf<-dapplyCollect(df,function(x){x<-cbind(x,"waiting_secs"=x$waiting*60)})head(ldf,3)## eruptions waiting waiting_secs##1 3.600 79 4740##2 1.800 54 3240##3 3.333 74 4440
Run a given function on a large dataset grouping by input column(s) and usinggapplyorgapplyCollect(在一个大的 dataset 上通过 input colums(输入列)来进行 grouping(分组)并且使用gapplyorgapplyCollect来运行一个指定的函数)
应用给一个函数到SparkDataFrame的每个 group. 该函数被应用到SparkDataFrame的每个 group, 并且应该只有两个参数: grouping key 和 Rdata.frame对应的 key. 该 groups 从SparkDataFrame的 columns(列)中选择. 函数的输出应该是data.frame. Schema 指定生成的SparkDataFramerow format. 它必须在 Sparkdata types 数据类型的基础上表示 R 函数的输出 schema(模式). 用户可以设置返回的data.frame列名.
# Determine six waiting times with the largest eruption time in minutes.schema<-structType(structField("waiting","double"),structField("max_eruption","double"))result<-gapply(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))},schema)head(collect(arrange(result,"max_eruption",decreasing=TRUE)))## waiting max_eruption##1 64 5.100##2 69 5.067##3 71 5.033##4 87 5.000##5 63 4.933##6 89 4.900
像gapply那样, 将函数应用于SparkDataFrame的每个分区,并将结果收集回 R data.frame. 函数的输出应该是一个data.frame. 但是,不需要传递 schema(模式). 请注意,如果在所有分区上运行的 UDF 的输出无法 pull(拉)到 driver 的内存, 那么gapplyCollect可能会失败.
# Determine six waiting times with the largest eruption time in minutes.result<-gapplyCollect(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))colnames(y)<-c("waiting","max_eruption")y})head(result[order(result$max_eruption,decreasing=TRUE),])## waiting max_eruption##1 64 5.100##2 69 5.067##3 71 5.033##4 87 5.000##5 63 4.933##6 89 4.900
类似于本地 R 中的lapply,spark.lapply在元素列表中运行一个函数,并使用 Spark 分发计算. 以类似于doParallel或lapply的方式应用于列表的元素. 所有计算的结果应该放在一台机器上. 如果不是这样, 他们可以像df < - createDataFrame(list)这样做, 然后使用dapply.
# Perform distributed training of multiple models with spark.lapply. Here, we pass# a read-only list of arguments which specifies family the generalized linear model should be.families<-c("gaussian","poisson")train<-function(family){model<-glm(Sepal.Length~Sepal.Width+Species,iris,family=family)summary(model)}# Return a list of model's summariesmodel.summaries<-spark.lapply(families,train)# Print the summary of each modelprint(model.summaries)
A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. Thesqlfunction enables applications to run SQL queries programmatically and returns the result as aSparkDataFrame.
# Load a JSON filepeople<-read.df("./examples/src/main/resources/people.json","json")# Register this SparkDataFrame as a temporary view.createOrReplaceTempView(people,"people")# SQL statements can be run by using the sql methodteenagers<-sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")head(teenagers)## name##1 Justin
SparkR 现支持下列机器学习算法:
spark.logit:逻辑回归 Logistic Regression
spark.survreg:加速失败时间生存模型 Accelerated Failure Time (AFT) Survival Model
spark.glmorglm:广义线性模型 Generalized Linear Model (GLM)
spark.randomForest:随机森林 for回归and分类
spark.gaussianMixture:高斯混合模型 (GMM)
spark.kstest:柯尔莫哥洛夫-斯米尔诺夫检验
SparkR 底层实现使用 MLlib 来训练模型. 有关示例代码,请参阅MLlib用户指南的相应章节. 用户可以调用summary输出拟合模型的摘要, 利用模型对数据进行预测, 并且使用write.ml/read.ml来 保存/加载拟合的模型 . SparkR 支持对模型拟合使用部分R的公式运算符, 包括 ‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘.
下面的例子展示了SparkR如何 保存/加载 机器学习模型.
training<-read.df("data/mllib/sample_multiclass_classification_data.txt",source="libsvm")# Fit a generalized linear model of family "gaussian" with spark.glmdf_list<-randomSplit(training,c(7,3),2)gaussianDF<-df_list[[1]]gaussianTestDF<-df_list[[2]]gaussianGLM<-spark.glm(gaussianDF,label~features,family="gaussian")# Save and then load a fitted MLlib modelmodelPath<-tempfile(pattern="ml",fileext=".tmp")write.ml(gaussianGLM,modelPath)gaussianGLM2<-read.ml(modelPath)# Check model summarysummary(gaussianGLM2)# Check model predictiongaussianPredictions<-predict(gaussianGLM2,gaussianTestDF)head(gaussianPredictions)unlink(modelPath)
Find full example code at "examples/src/main/r/ml/ml.R" in the Spark repo.
RSpark
bytebyte
integerinteger
floatfloat
doubledouble
numericdouble
characterstring
stringstring
binarybinary
rawbinary
logicalboolean
POSIXcttimestamp
POSIXlttimestamp
Datedate
arrayarray
listarray
envmap
SparkR 支持 Structured Streaming API (测试阶段). Structured Streaming 是一个 构建于SparkSQL引擎之上的易拓展、可容错的流式处理引擎. 更多信息请参考 R APIStructured Streaming Programming Guide
当在R中加载或引入(attach)一个新package时, 可能会发生函数名冲突,一个函数掩盖了另一个函数
下列函数是被SparkR所掩盖的:
被掩盖函数如何获取
covinpackage:statsstats::cov(x, y = NULL, use = "everything",
method = c("pearson", "kendall", "spearman"))
filterinpackage:statsstats::filter(x, filter, method = c("convolution", "recursive"),
sides = 2, circular = FALSE, init)
sampleinpackage:basebase::sample(x, size, replace = FALSE, prob = NULL)
由于SparkR的一部分是在dplyr软件包上建模的,因此SparkR中的某些函数与dplyr中同名. 根据两个包的加载顺序, 后加载的包会掩盖先加载的包的部分函数. 在这种情况下, 可以在函数名前指定包名前缀, 例如:SparkR::cume_dist(x)ordplyr::cume_dist(x).
你可以在 R 中使用search()检查搜索路径
在Spark 1.6.0 之前, 写入模式默认值为append. 在 Spark 1.6.0 改为error匹配 Scala API.
SparkSQL 将R 中的NA转换为null,反之亦然.
table方法已经移除并替换为tableToDF.
类DataFrame已改名为SparkDataFrame避免名称冲突.
Spark的SQLContext和HiveContext已经过时并替换为SparkSession. 相应的摒弃sparkR.init()而通过调用sparkR.session()来实例化SparkSession. 一旦实例化完成, 当前的SparkSession即可用于SparkDataFrame 操作(注释:spark2.0开始所有的driver实例通过sparkSession来进行构建).
sparkR.session不支持sparkExecutorEnv参数.要为executors设置环境,请使用前缀”spark.executorEnv.VAR_NAME”设置Spark配置属性,例如”spark.executorEnv.PATH”, -sqlContext不再需要下列函数:createDataFrame,as.DataFrame,read.json,jsonFile,read.parquet,parquetFile,read.text,sql,tables,tableNames,cacheTable,uncacheTable,clearCache,dropTempTable,read.df,loadDF,createExternalTable.
registerTempTable方法已经过期并且替换为createOrReplaceTempView.
dropTempTable方法已经过期并且替换为dropTempView.
scSparkContext 参数不再需要下列函数:setJobGroup,clearJobGroup,cancelJobGroup
join不再执行笛卡尔积计算, 使用crossJoin来进行笛卡尔积计算.
createDataFrame和as.DataFrame添加numPartitions参数. 数据分割时, 分区位置计算已经与scala计算相一致.
方法createExternalTable已经过期并且替换为createTable. 可以调用这两种方法来创建外部或托管表. 已经添加额外的 catalog 方法.
默认情况下,derby.log现在已保存到tempdir()目录中. 当实例化SparkSession且选项enableHiveSupport 为TRUE,会创建derby.log .
更正spark.lda错误设置优化器的bug.
更新模型概况输出coefficientsasmatrix. 更新的模型概况包括spark.logit,spark.kmeans,spark.glm.spark.gaussianMixture的模型概况已经添加对数概度(log-likelihood)loglik.
原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sparkr.html
网页地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(觉得不错麻烦给个 Star,谢谢!~)
网友评论