美文网首页
spark programming

spark programming

作者: xncode | 来源:发表于2017-08-11 15:19 被阅读0次

    driver进程用于运行用户的主程序,然后在集群的机子上分布执行并行操作。

    概念

    RDD

    RDD resilient distributed dataset,是分布在集群节点中的各数据元素分片的集合,可被并行地操作。

    RDD是通过读取hdfs中的文件或是通过已经存在的集合转换。

    shared variables

    在分布式执行时,传递的是变量的复制,如果需要在任务之间共享的:

    broadcast variables

    accumulators

    连接

    SparkContext是用于告知Spark如何连接到集群中

    conf = SparkConf().setAppName(appName)
    # 但是首先得创建一个SparkConf
    # 可以在此处直接调用setmaster设置运行方式 但是一般会在运行时通过参数设置
    sc = SparkContext(conf=conf)
    

    如果使用的是shell,则已经有了创建好的SparkContext sc来使用,不能再次创建。可在运行是加--py-file、--packages、--repositories来添加python依赖。

    RDD

    创建的两种方式

    parallelized collections

    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)
    # 会根据集群的配置情况自动分片
    # 然后复制到各节点来形成分布式的数据集 可以并行地操作
    

    值得注意的是parallelize可接受第二个参数来设置分片的数量

    parallelize(data, 10)
    

    external dataset

    distFile = sc.textFile("data.txt")
    # 读取text文件 可以使用hdfs s3n的uri
    # 如果使用的是本地文件路径 需要所有worker的对应路径上都有
    # 支持文件路径 文件名称通配符 压缩 
    textFile("/my/directory")
    textFile("/my/directory/*.txt")
    textFile("/my/directory/*.gz")
    

    textfile也可以接受第二个参数声明文件分片大小 默认是128MB

    除了textfile外还可以使用

    wholeTextFiles可以读取路径下的所有文件作为键值对返回(一般是处理目录下包含多个小文件的情况)
    
    saveAsPickleFile pickleFile 可以按python的Pickle方式存取 默认的batch大小是10
    
    rdd.saveAsSequenceFile()
    sc.sequenceFile()
    sequenceFile和HDFS
    

    operation

    有两种操作类型:

    变换:从已存在的dataset中创建出来
    动作:通过一定的操作计算后的返回值

    basic

    lines = sc.textFile("")
    lineLength = lines.map(lambda s: len(s))
    lineLength.persist()
    totalLength = lineLength.reduce(lambda a, b: a+b)
    

    传递

    lambda
    本地函数(作用域内定义的函数)
    全局函数

    虽然说可以传递类的方法,但是这样会传递整个对象。如果用到了类,最好是把使用到的类中的东西接出来到局部变量中然后传递。

    作用域

    如果定义了一个函数,然后通过rdd的foreach传递运行该函数,如果在函数中引用的是driver的全局变量,则可能会有问题。

    在调用分布式函数之前,spark会计算该任务的作用域,即必须对执行器可见的变量和方法,然后把该作用域序列化并传递给各个执行器。

    传递给执行器的是一份复制的变量,每个执行器操作的是他自己的变量,所以driver中的全局变量不变。

    但是如果是在本地运行的同时是在一个jvm中,那么全局变量可能是会被修改的。但是应该是accumulater来实现这一功能。

    对于变量的打印,如果在集群模式下运行,打印的输出是各个节点。正确的方式是先调用collect方法来收集到本地。如果只想看一些元素,可以调用take

    变换

    map(func) 把func作用于rdd中的每个元素 返回
    filter(func) 返回func为true的元素
    flatmap(func) func的返回值是 seq,把func作用到rdd中的每个元素
    mapPartitions(func)
    mapPartitionsWithIndex(func)
    sample(withReplacement, fraction, seed) 抽样
    union(rdd) 合集
    intersection(rdd) 交集
    distinct([numTasks])
    groupByKey reduceByKey aggregateByKey SortByKey
    join
    cogroup
    cartesian
    pipe
    coalesce
    repartition
    repartitionAndSortWithinPartitions

    动作

    reduce(func) func接受两个参数然后返回一个值
    collect()
    count()
    first()
    take(n)
    takeSample(withReplacement, num, seed)
    takeOrdered(n, [ordering])
    saveAsTextFile(path)
    saveAsSequenceFile(path)
    saveAsObjectFile(path)
    countByKey()
    foreach(func)

    shuffle operation

    是spark重新分布数据的机制 通常会触发执行器和机器的数据复制,是一个耗时、复杂的动作,包含:repartition coalesce groupByKey reduceByKey cogroup join

    RDD持久化

    MEOMORY_ONLY
    MEMORY_AND_DISK
    MEMORY_ONLY_SER
    MEMORY_AND_DISK_SER
    DISK_ONLY
    MEMORY_ONLY_2 MEMORY_AND_DISK_2...

    spark会在一些shuffle操作时自动持久化,例如reducebykey

    可以显式调用unpersist

    Shared Variable

    Broadcast Variables

    broadcast = sc.broadcast([1, 2, 3])
    broadcast.value
    

    Accumulators

    只有driver可以读取accumulator的数据,其他执行器只能加
    

    相关文章

      网友评论

          本文标题:spark programming

          本文链接:https://www.haomeiwen.com/subject/cqnlrxtx.html