美文网首页
CS190 Scalable Machine Learning

CS190 Scalable Machine Learning

作者: 简简单单书写 | 来源:发表于2015-08-03 17:29 被阅读0次

    标签(空格分隔): Spark ML


    RDDs

    • Two types of operations: transformations and actions
    • Transformations are lazy (not computed immediately)
    • Transformed RDD is executed when action runs on it
    • Persist (cache) RDDs in memory or disk

    Working with RDDs

    • Create an RDD from a data source:
    • Apply transformations to an RDD: map filter
    • Apply actions to an RDD: collect count

    Some Transformations

    Transformation Description
    map(func) return a new distributed dataset formed by passing each element of the source through a function func
    filter(func) return a new dataset formed by selecting those elements of the source on which func returns true
    distinct([numTasks])) return a new dataset that contains the distinct elements of the source dataset
    flatMap(func) similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item)

    例子:

    >>> rdd =   sc.parallelize([1,  2,  3,  4]) 
    >>> rdd.map(lambda  x:  x   *   2)  
    RDD:    [1, 2,  3,  4]  →   [2, 4,  6,  8]  
    >>> rdd.filter(lambda   x:  x   %   2   ==  0)  
    RDD:    [1, 2,  3,  4]  →   [2, 4]  
    >>> rdd2    =   sc.parallelize([1,  4,  2,  2,  3]) 
    >>> rdd2.distinct() 
    RDD:    [1, 4,  2,  2,  3]  →   [1, 4,  2,  3]  
    
    >>> rdd =   sc.parallelize([1,  2,  3]) 
    >>> rdd.Map(lambda  x:  [x, x+5])   
    RDD:    [1, 2,  3]  →   [[1,    6], [2, 7], [3, 8]] 
    >>> rdd.flatMap(lambda  x:  [x, x+5])   
    RDD:    [1, 2,  3]  →   [1, 6,  2,  7,  3,  8]  
    
    

    Spark Actions

    • Cause Spark to execute recipe to transform source
    • Mechanism for getting results out of Spark

    Action Description
    reduce(func) aggregate dataset’s elements using function func.func takes two arguments and returns one, and is commutative and associative so that it can be computed correctly in parallel
    take(n) return an array with the first n elements collect() return all the elements as an array WARNING: make sure will fit in driver program
    takeOrdered(n, key=func) return n elements ordered in ascending order or as specified by the optional key function

    例子

    >>> rdd =   sc.parallelize([1,  2,  3]) 
    >>> rdd.reduce(lambda   a,  b:  a   *   b)  
    Value:  6 
    #(1 * 2 * 3)
    >>> rdd.take(2) 
    Value:  [1,2]   #   as  list
    >>> rdd.collect()   
    Value:  [1,2,3] #   as  list    
    >>> rdd =   sc.parallelize([5,3,1,2])
    >>> rdd.takeOrdered(3,  lambda  s:  -1  *   s)  
    Value:  [5,3,2] #   as  list
    

    .count()
    .cache()

    lines   =   sc.textFile("...",  4)  
    lines.cache()   #   save,   don't   recompute!  
    comments    =   lines.filter(isComment) 
    print   lines.count(),comments.count()  
    

    Spark Program Lifecycle

    1. Create RDDs from external data or parallelize a collection in your driver program
    2. Lazily transform them into new RDDs
    3. cache() some RDDs for reuse
    4. Perform actions to execute parallel computation and produce results

    Key-Value RDDs

    Key-Value Transformation Description
    reduceByKey(func) return a new distributed dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V
    sortByKey() return a new dataset (K, V) pairs sorted by keys in ascending order
    groupByKey() return a new dataset of (K, Iterable<V>) pairs

    ! 使用groupByKey()是要注意,可能需要大量数据在网络中移动,同时生成的list可能非常大,导致worker内存耗尽

    >>> rdd =   sc.parallelize([(1,2),  (3,4),  (3,6)]) 
    >>> rdd.reduceByKey(lambda  a,  b:  a   +   b)      
    RDD:    [(1,2), (3,4),  (3,6)]  →   [(1,2), (3,10)] 
    >>> rdd2    =   sc.parallelize([(1,'a'),    (2,'c'),    (1,'b')])   
    >>> rdd2.sortByKey()
    RDD:    [(1,'a'),   (2,'c'),    (1,'b')]    →   [(1,'a'),   (1,'b'),    (2,'c')]
    >>> rdd2    =   sc.parallelize([(1,'a'),    (2,'c'),    (1,'b')])   
    >>> rdd2.groupByKey()   
    RDD:    [(1,'a'),   (1,'b'),    (2,'c')]    →   [(1,['a','b']), (2,['c'])]
    

    pySpark Shared Variables

    Broadcast Variables(广播变量)
    » Efficiently send large, read-only value to all workers
    » Saved at workers for use in one or more Spark operations
    » Like sending a large, read-only lookup table to all the nodes

    #   Country code lookup for HAM radio call signs
    #   Lookup  the locations   of  the call    signs   on  the 
    #   RDD contactCounts.  We  load    a   list    of  call    sign        
    #   prefixes    to  country code    to  support this    lookup      
    signPrefixes    =   sc.broadcast(loadCallSignTable())       
    
    def processSignCount(sign_count,    signPrefixes):
        country =   lookupCountry(sign_count[0],    signPrefixes.value) 
        count   =   sign_count[1]   
        return (country,    count)      
        
    countryContactCounts    =   (contactCounts
                                .map(processSignCount)  
                                .reduceByKey((lambda x, y:  x+  y)))        
    

    Accumulators (累加器)
    » Aggregate values from workers back to driver
    » Only driver can access value of accumulator
    » For tasks, accumulators are write-only
    » Use to count errors seen in RDD across workers

    # Counting empty lines
    file=sc.textFile(inputFile) 
    #   Create  Accumulator[Int]    initialized to  0       
    blankLines  =   sc.accumulator(0)
    
    def extractCallSigns(line): 
        global blankLines # Make    the global  variable    accessible  
        if (line    ==  ""):        
            blankLines  +=  1       
        return line.split(" ")  
            
    callSigns   =   file.flatMap(extractCallSigns)      
    print   "Blank  lines:  %d" %   blankLines.value        
    

    --
    更多文档参考:
    Introduction to Big Data with Apache Spark
    pySpark 文档
    pyspark-pictures

    相关文章

      网友评论

          本文标题:CS190 Scalable Machine Learning

          本文链接:https://www.haomeiwen.com/subject/tuyvqttx.html