美文网首页
PySpark-装载数据

PySpark-装载数据

作者: NEO_X | 来源:发表于2019-07-05 15:48 被阅读0次

    更多信息https://blue-shadow.top/

    主要函数

    主要是这三个函数:
    parallelize ; textFile ; wholetextFiles

    其他的针对特定格式的文件:

    • binaryFiles(path, minPartitions=None)
    • hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
    • pickleFile(name, minPartitions=None)
    • sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)[source]

    装载数据的目的为了生成RDD。

    做任何Spark开发前,都先要对spark核心类SparkContext有一个清晰的认识,以下链接来自spark官网的说明。
    SparkContext相关说明

    Spakcontext表示与Spark群集的连接,可用于在该群集上创建RDD和广播变量。所以在进行任何Spark操作前都是需要SparkContext对象

    class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)

    • Master - 它是连接到的集群的URL。
    • appName - 工作名称。
    • sparkHome - Spark安装目录。
    • pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。
    • environment - 工作节点环境变量。
    • batchSize - 表示为单个Java对象的Python对象的数量。设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。
    • serializer - RDD序列化器。
    • conf - L {SparkConf}的一个对象,用于设置所有Spark属性。
    • gateway- 使用现有网关和JVM,否则初始化新JVM。
    • JSC - JavaSparkContext实例。
    • profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler).

    SparkContext是spark的主要切入点,由于RDD是主要的API,通过sparkcontext来创建和操作RDD ; 对于其他不同的处理场景使用不同的Context。如SqlContext,StreamingContext,hiveContext.这种多个Context的情况在Spark 1.0时代出现。在Spark 2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来
    SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

    parallelize(c,numSlices=None)

    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]
    

    除了开发原型和测试时,这种方式用得并不多,毕竟这种方式需要把整个数据集先放在一台机器的内存中,针对大数据量无法处理

    textFile(name, minPartitions=None, use_unicode=True)

    从外部读完数据,可以在不同的数据源上进行。

    >>> path = os.path.join(tempdir, "sample-text.txt")
    >>> with open(path, "w") as testFile:
    ...    _ = testFile.write("Hello world!")
    >>> textFile = sc.textFile(path)
    >>> textFile.collect()
    ['Hello world!']
    

    用于读取不同外部数据源的数据,针对不同的数据机构;textFile是主要的获取数据的方法

    wholeTextFiles(path, minPartitions=None, use_unicode=True)

    同时处理整个文件。如果文件足够小,那么可以使用SparkContext.wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名

    >>> dirPath = os.path.join(tempdir, "files")
    >>> os.mkdir(dirPath)
    >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
    ...    _ = file1.write("1")
    >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
    ...    _ = file2.write("2")
    >>> textFiles = sc.wholeTextFiles(dirPath)
    >>> sorted(textFiles.collect())
    [('.../1.txt', '1'), ('.../2.txt', '2')]
    

    用于读取文件内容,进行一次处理;针对小文件可以进行这样的处理,一次性读取

    总结

    主要介绍了数据读取的3种方法,读取的后生成RDD,就可以进行对RDD进行操作。在下一章种进行对RDD的基础操作进行说明。

    脚本示例

    from pyspark import SparkContext
    
    def test_parallelize(sc):
        """
        对parallelize使用
        """
        print('{}{}'.format('parallelize:',sc.parallelize([0, 2, 3, 4, 6], 5).collect()))
    
    def test_textFile(sc):
        """
        对textFile使用
        """
        lines=sc.textFile('E:\py_lab\README.md')
        print('{}{}'.format('textFile -> num of doc lines:',lines.count()))
        print('{}{}'.format('textFile -> doc first line:',lines.first()))
    
    def test_wholeTextFiles(sc):
        """
        对wholeTextFiles使用
        """
        lines=sc.wholeTextFiles('E:\py_lab\README.md')
        print('{}{}'.format('wholeTextFiles -> num of doc lines:',lines.count()) )
        print('{}{}'.format('wholeTextFiles -> doc first line:',lines.first()) )
        
    if __name__=='__main__': 
        sc = SparkContext()
        test_parallelize(sc)
        test_textFile(sc)
        test_wholeTextFiles(sc)
    
    get_data.png

    上一篇:pyspark-前言
    下一篇:pyspark-rdd-操作

    相关文章

      网友评论

          本文标题:PySpark-装载数据

          本文链接:https://www.haomeiwen.com/subject/ncjehctx.html