一、parallelizing an existing collection in your driver program
第一种通过并行化一个存在的集合来得到一个RDD
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
可以看到调用SparkContext的parallelize可以将一个集合转成一个ParallelCollectionRDD的RDD。我们可以对这个RDD进行求和操作
scala> distData.reduce((a, b) => a + b)
res0: Int = 15
二、referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
第二种通过外部存储系统来得到一个RDD,这个是生产常用的。
scala> val rdd = sc.textFile("/home/hadoop/soul/data/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = /home/hadoop/soul/data/wordcount.txt MapPartitionsRDD[2] at textFile at <console>:24
or
scala> val rdd = sc.textFile("file///home/hadoop/soul/data/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = file///home/hadoop/soul/data/wordcount.txt MapPartitionsRDD[4] at textFile at <console>:24
or
scala> val rdd = sc.textFile("hdfs://hadoop000:8020/g6/data/wordcount.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop000:8020/g6/data/wordcount.txt MapPartitionsRDD[6] at textFile at <console>:24
--------------------------------------------------------------------------------------------------
scala> rdd.collect
res1: Array[String] = Array(spark hadoop hadoop, hive hbase hbase, hive hadoop hadoop, hive hadoop hadoop)
三、注意事项
-
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system
如果是从 local filesystem 的path读取来创建RDD,那么这个path下文件需要在每个节点上都必须存在,或者使用挂载系统 -
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
SparkContext的textFile方法支持目录(directories)/压缩文件,或者使用通配符
- The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
textFile方法还采用可选的第二个参数来控制文件的分区数。 默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB),但您也可以通过传递更大的值来请求更多的分区。 请注意,您不能拥有比块少的分区。
网友评论