spark

作者: hehehehe | 来源:发表于2021-07-16 10:53 被阅读0次

    https://www.cnblogs.com/redhat0019/p/8665491.html

    1. 获取SparkSession
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
    
    1. 获取SparkContext
    1. 获取sparkSession:  se = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
    1. 获取sparkContext:  sc = se.sparkContext
    2. 获取sqlContext:    sq = SparkSession.builder.getOrCreate()
    3. 获取DataFrame:     df = sqlContext.createDataFrame(userRows)
    
    line1 = sc.textFile("hdfs://192.168.88.128:9000/hello.txt")
    rawData = sc.textFile("hdfs://192.168.88.128:9000/data/sanxi/sanxi/*.gz")   获取sanxi文件夹下所有.gz的文件
    rawData = sc.textFile("file:///data/sanxi2/*.gz")     spark 读取本地文件
    
    1. filter 使用方法
    1. 过滤包含指定字符的RDD
      line2 = line1.filter(lambda x : "a" in x) 
    2. 接收一个函数,   将满足该函数的元素放入新的RDD中
       def hasHWTC1AC5C088(line):
        return "HWTC1AC5C088" in line
       lines2 = lines.filter(hasHWTC1AC5C088("HWTC1AC5C088"))  #将函数传入filter中
    3. RDD 删除第一条数据
       header = abc.first()
       df1 = abc.filter(lambda x:x != header)
    
    1. map 和 flatMap 使用方法 将 lambda 函数做用在每一条记录上
    1)line2 = line1.map(lambda x: x.split(" "))
     2)line3 = line1.map(lambda x: x+"abc")  #对原数据进行任意操作,  将结果再放回给原数据
    
      3)line4 = line1.map(lambda x: (x, 1))  将原始数据改为 key-value形式,  key为原数据,  value为 1
    
     4)line2.flatMap(lambda line: line.split(" "))  #  
    
     
    
     5)map 与 flatMap 的区别(通常用来统计单词个数示例,  必须使用flatMap来进行拆分单词)
    
       map 具有分层,   就是每一行数据作为你一层来处理  ,  结果为: 
    
        [[u'extends', u'Object'], [u'implements', u'scala.Serializable']]   
    
       flatMap 不具有分层,   
    
        [u'extends', u'Object', u'implements', u'scala.Serializable']
    
      6)map 获取前3列数据   下例中:  [:3]  表示从开头到第三个数据项,   如果是[3:]  就表示从第三项到最后
    
        Rdd.map(lambda x: x.split(" ")[:3]) 结果:[[u'a', u'1', u'3'], [u'b', u'2', u'4'], [u'd', u'3', u'4']]
    
      ALS 训练数据---获取指定列数据
    
        ratingsRdd = rawRatings.map(lambda x:(x[0],x[1],x[2])  结果为:
    
          [(u'196', u'242', u'3'), (u'186', u'302', u'3'), (u'22', u'377', u'1')]
    
      7) 类型转换
    
        Rdd.map(lambda x: float(x[0]))    将第一个字段转换为 float 类型
    
      8) 删除所有的 "" 号  replace(替换),   下列意思是将" 替换成空
    
        df2 = df1.map(lambda x:x.replace("\"",""))
    
      9) df2 = RDD.map(lambda x: (x[0],float(x[1]),float(x[2])))   设置一个 key 对应 多个value,  
    
        df3 = df2.filter(lambda keyValue: keyValue[0] > 2)    操作key
    
         df3 = df2.filter(lambda keyValue: keyValue[1] > 2)   操作第一个value
    
         df3 = df2.filter(lambda keyValue: keyValue[2] > 2)    操作第二个value
    
    1. RDD 类型数据 的查询方式
    print(abc)    打印当前对象
    type(Rdd)    获取当前对象类型
    RDD.collect()    将RDD转换为数组,  结果格式为:([u'{"name":"Michael"}', u'{"name":"Andy", "age":30}', u'{"name":"Justin", "age":19}'])
    RDD.count()    查看内容条数
    Rdd.printSchema()   查看rdd 列
    
    1. RDD转换操作 rdd转list
    1) list = RDD.collect()
    2) list转RDD
            RDD = sc.parallelize(list)
    3) RDD 调用 map 函数
      (1)  RDD1 = RDD2.map(lambda x: x+1)    #使用匿名函数操作每条数据  map(lambda x: x.split(","))字符串截取,map(lambda x: "abc"+x) 重组字符串
      (2)  RDD2 = RDD1.map(addOne)   #使用具名函数来操作每条数据(具名函数就是单独定义一个函数来处理数据) 如下:
          def addOne(x):
              return x.split(",")   
          print(lines.map(addOne).collect())   #调用具名函数
    4. RDD 调用 filter 函数
      1) intRdd.filter(lambda x: x>5)    #对数字类型的 RDD 进行筛选 intRdd.filter(lambda x: x>5 and x <40)  and 表示 并且 的意思,  or 表示 或 的意思
      2) stringRdd.filter(lambda x: "abc" in x)    #筛选包含  abc  的数据
    4. RDD 删除 重复 元素
      1) intRdd.distinct()     #去重
    5. 随机将一个 RDD 通过指定比例 分为 2 个RDD
      1) sRdd = stringRdd.randomSplit([0.4,0.6])    将 stringRdd 以4:6 分为2个 RDD,  获取其中一个 RDD 的方法为: sRdd[0]
    6. RDD 中 groupBy  分组计算
      1) gRdd = intRdd.groupBy(lambda x: x<2)   #将会分为2组,  访问第一粗:  print(sorted(gRdd[0][1])),  方位第二组:print(sorted(gRdd[1][1]))
      2) 分组并且取别名:  gRdd = intRdd.groupBy(lambda x: "a" if(x < 2) else "b"),   
        (1)获取第一组信息: print(gRdd[0][0], sorted(gRdd[0][1]))
        (2) 获取第二组信息: print(gRdd[1][0], sorted(gRdd[1][1]))   其中,   前半部分 gRdd[1][0] 表示获取别名 a
    
    1. 使用 union 进行并集运算, intersection 进行并集运算
    1) intRdd1.union(intRdd2)     如: intRdd1 为 1, 3, 1   intRdd2 为 1, 2, 3, 4  则结果为: 1,3,1,1,2,3,4
    2) intRdd1.intersection(intRdd2)   计算 2 个RDD 的交集
    3)intRdd3.subtract(intRdd1)   计算 2 个 Rdd 的差集,   此例表示  intRdd3中有, 但在intRdd1中没有
    4)intRdd1.cartesian(intRdd2)  计算 笛卡尔积
    
    1. RDD 动作运算
    [1] 读取元素  
      1) first()  查看RDD 第一条数据
      2) take(2)  获取第二条数据
      3) takeOrdered(3)   从小到大排序取出前 3 条数据
      4) intRdd3.takeOrdered(6,key=lambda x: -x)  从大道小排序, 取出前6条数据
    [2] 统计功能
      1) intRdd1.stats()   统计 intRdd1, 结果为:(count: 5, mean: 5.0, stdev: 2.82842712475, max: 9, min: 1)
         mean表示平均值,  stdev 表示标准差
      2)intRdd3.min() 最新值, 
      3)intRdd3.max() 最大值
      4)intRdd3.stdev()  标准差
      5)intRdd3.count()  数据条数
      6)intRdd3.sum()    求和
      7)intRdd3.mean()   平均值
    
    1. RDD key-value 基本转换运算
    1)kvRdd1 = sc.parallelize([(1, 4),(2, 5),(3, 6),(4, 7)])  创建RDD key-value 源数据
      结果为:  [(1, 4), (2, 5), (3, 6), (4, 7)]
    2)kvRdd1.keys()    获取全部 key 的值
    3)kvRdd1.values()   获取全部 values 的值
    4)kvRdd1.filter(lambda keyValue: keyValue[0] > 2)  过滤 key > 2 的数据
    5)kvRdd1.filter(lambda keyValue: keyValue[1] >5)   过滤 value > 5 的数据
    6)kvRdd1.mapValues(lambda x: x*x)  对每一条 value 进行运算
    7)kvRdd1.sortByKey()  按照 key 从小到大 进行排序
    8)kvRdd1.sortByKey(ascending=False)  按照 key 从大到小进行排序
    9)kvRdd3.reduceByKey(lambda x, y:x+y)   将 key 相同的键的值合并相加
    
    1. 多个 RDD key-value 的转换运算
    1) join
      intK1 = sc.parallelize([(1,5),(2,6),(3,7),(4,8),(5,9)])
      intK2 = sc.parallelize([(3,30),(2,20),(6,60)])
      intK1.join(intK2)   join结果为:
      [(2, (6, 20)), (3, (7, 30))]
    2)leftJoin
      intK1.leftOuterJoin(intK2).collect()   leftJoin结果为:
      [(2, (6, 20)), (4, (8, None)), (1, (5, None)), (3, (7, 30)), (5, (9, None))]
    3)rightJoin    rightJoin 结果为:
      intK1.rithtOuterJoin(intK2).collect()
      [(2, (6, 20)), (6, (None, 60)), (3, (7, 30))]
    4)subtractByKey    从 intK1 中删除 与 intK2 相同 key-value
      intK1.subtractByKey(intK2)    结果为:
      [(4, 8), (1, 5), (5, 9)] 
    
    1. key-value 动作 运算
    1) intK1.first()    获取第一项数据
    2) intK1.collect()  获取所有项数据
    3) intK1.take(2)    获取前二项数据
    4) intK1.first()[0] 获取第一项数据的 key
    5) intK1.first()[1] 获取第一项数据的 value
      例如:  一条记录结果为 [(2, (6, 20)), (4, (8, None)), (1, (5, None)), (3, (7, 30)), (5, (9, None))](leftJoin结果)
      想要获取第一条记录的 6 ,  可以使用: intK1.leftOuterJoin(intK2).first()[1][0]  [1] 表示获取第一条记录的value, [0] 表示
      从 value 中再获取第一项值 6
    6) intK3.countByKey()   计算 RDD 中每一个 Key 值得项数, 例如
      [(1, 2), (2, 3), (2, 5), (2, 8), (5, 10)]   源数据
      defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1})   结果值
    7) KV = intK3.collectAsMap()   将 key-value 转换为 key-value的字典 
      {1: 2, 2: 8, 5: 10}   结果为
      例如,  如果要获取 8 这个value,   就使用  KV[2]  就可以获取得到
    8) intK3.lookup(2)   查找 key 为 2 的所有value 值, 如果想要再进行统计计算,   就将结果再进行转换为 RDD 进行统计计算
    9) 广播变量
      1> kvFrult = sc.parallelize([(1, "apple"),(2, "orange"),(3, "grape")])   创建key-value 对照表
      2> fruitMap = kvFrult.collectAsMap()      转换为 map 字典
      3> bcFruitMap = sc.broadcast(fruitMap)    创建广播变量
      4> fruitIds = sc.parallelize([2,4,1,3])   创建编号 RDD
      5> fruitNames = fruitIds.map(lambda x: bcFruitMap.value[x])    使用 bcFruitMap.value 进行转换 从而获取编号对应的名称
    10) 通过累加器来计算总和
      intRdd = sc.parallelize([1,2,44,2,11,22]) 源数据
      total = sc.accumulator(0.0)  定义一个double类型的累加器,   来计算总和
      num = sc.accumulator(0)  定义一个int类型的累加器,   来计算数量
      intRdd.foreach(lambda l: [total.add(l), num.add(1)])  通过foreach 循环来统计
      total.value   获取总和
      num.value   获取个数
      avg = total.vaue/num.value    获取平均值
    11) RDD 持久化
      1.书221 页面,  设置持久化等级列表
      2.intRdd1.persist()  设置持久化
      2.intRdd1.persist(StorageLevel.MEMORY_AND_DISK)  设置存储等级
      4.intRdd1.is_cached  查看是否持久化
    12) RDD.saveAsTextFile("hdfs://192.168.88.128:9000/data/result.txt")  将结果保存成文件
    

    相关文章

      网友评论

          本文标题:spark

          本文链接:https://www.haomeiwen.com/subject/cwgvpltx.html