美文网首页
Spark-装载数据

Spark-装载数据

作者: NEO_X | 来源:发表于2019-05-22 16:57 被阅读0次

主要函数

主要是这三个函数,还有另外的方式是从现有的RDD中的生成,主要为transformation函数,其返回RDD
parallelize ; textFile ; wholetextFiles

其他的针对特定格式的文件:

  • binaryFiles(path, minPartitions=None)
  • hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
  • pickleFile(name, minPartitions=None)
  • sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)[source]

装载数据的目的为了生成RDD。

做任何Spark开发前,都先要对spark核心类SparkContext有一个清晰的认识,以下链接来自spark官网的说明。
SparkContext相关说明

Spakcontext表示与Spark群集的连接,可用于在该群集上创建RDD和广播变量。所以在进行任何Spark操作前都是需要SparkContext对象

class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)

  • Master - 它是连接到的集群的URL。
  • appName - 工作名称。
  • sparkHome - Spark安装目录。
  • pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。
  • environment - 工作节点环境变量。
  • batchSize - 表示为单个Java对象的Python对象的数量。设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。
  • serializer - RDD序列化器。
  • conf - L {SparkConf}的一个对象,用于设置所有Spark属性。
  • gateway- 使用现有网关和JVM,否则初始化新JVM。
  • JSC - JavaSparkContext实例。
  • profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler).

SparkContext是spark的主要切入点,由于RDD是主要的API,通过sparkcontext来创建和操作RDD ; 对于其他不同的处理场景使用不同的Context。如SqlContext,StreamingContext,hiveContext.这种多个Context的情况在Spark 1.0时代出现。在Spark 2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

parallelize(c,numSlices=None)

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

除了开发原型和测试时,这种方式用得并不多,毕竟这种方式需要把整个数据集先放在一台机器的内存中,针对大数据量无法处理

textFile(name, minPartitions=None, use_unicode=True)

从外部读完数据,可以在不同的数据源上进行。

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello world!']

用于读取不同外部数据源的数据,针对不同的数据机构;textFile是主要的获取数据的方法

wholeTextFiles(path, minPartitions=None, use_unicode=True)

同时处理整个文件。如果文件足够小,那么可以使用SparkContext.wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名

>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
...    _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
...    _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[('.../1.txt', '1'), ('.../2.txt', '2')]

用于读取文件内容,进行一次处理;针对小文件可以进行这样的处理,一次性读取

总结

主要介绍了数据读取的3种方法,读取的后生成RDD,就可以进行对RDD进行操作。在下一章种进行对RDD的基础操作进行说明。

脚本示例

from pyspark import SparkContext

def test_parallelize(sc):
    """
    对parallelize使用
    """
    print('{}{}'.format('parallelize:',sc.parallelize([0, 2, 3, 4, 6], 5).collect()))

def test_textFile(sc):
    """
    对textFile使用
    """
    lines=sc.textFile('E:\py_lab\README.md')
    print('{}{}'.format('textFile -> num of doc lines:',lines.count()))
    print('{}{}'.format('textFile -> doc first line:',lines.first()))

def test_wholeTextFiles(sc):
    """
    对wholeTextFiles使用
    """
    lines=sc.wholeTextFiles('E:\py_lab\README.md')
    print('{}{}'.format('wholeTextFiles -> num of doc lines:',lines.count()) )
    print('{}{}'.format('wholeTextFiles -> doc first line:',lines.first()) )
    
if __name__=='__main__': 
    sc = SparkContext()
    test_parallelize(sc)
    test_textFile(sc)
    test_wholeTextFiles(sc)
get_data.png

相关文章

  • Spark-装载数据

    主要函数 主要是这三个函数,还有另外的方式是从现有的RDD中的生成,主要为transformation函数,其返回...

  • 初始装载

    在Hive中装载维度表。 初始装载 在数据仓库可以使用前,需要装载历史数据。这些历史数据是导入进数据仓库的第一个数...

  • hive库装载和导出数据总结

    把数据装载到hive库一般有两种,一种是通过etl抽取数据然后装载数据到hive,一种是文件装载到hive库,导...

  • spark-天池O2O竞赛

    地址转移到 : spark-天池O2O竞赛

  • Spark-数据倾斜问题

    以wordcount为例,假设某一单词数量很大,在map阶段形成k-v对,进入reduce阶段时,会产生分区间数据...

  • 大数据的一些名词概念

    etl: 数据的抽取(Extract), 数据清洗(Transform), 装载(Load), 业务系统的数据经...

  • PySpark-装载数据

    更多信息https://blue-shadow.top/ 主要函数 主要是这三个函数:parallelize ; ...

  • Hive 系列 - DML数据操作

    1 数据导入 1.1 向表中装载数据(Load) 语法 ​ (1)load data:表示加载数据 ​ (...

  • 使用LOAD工具导致DB2进入备份暂挂、复原暂挂、装入暂挂及完整

    为了加快数据的装载速度,许多DBA们喜欢使用load工具进行数据的迁移和装载,但是load工具带来便利性的同时也为...

  • Oracle 指令总结

    数据库 装载数据库:Startup open 卸载数据库:Shutdown immediate 服务 数据库服务:...

网友评论

      本文标题:Spark-装载数据

      本文链接:https://www.haomeiwen.com/subject/rgbuzqtx.html