美文网首页
004 pyspark对应于 scalaspark的实现(参见0

004 pyspark对应于 scalaspark的实现(参见0

作者: 逸章 | 来源:发表于2019-11-30 15:14 被阅读0次
    image.png
    >>> from decimal import Decimal
    >>> acTransList = ["SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10"]
    >>> acTransRDD = sc.parallelize(acTransList)
    >>> goodTransRecords = acTransRDD.filter(lambda trans: Decimal(trans.split(",")[1]) > 0).filter(lambda trans: (trans.split(",")[0]).startswith('SB') == True)
    >>> highValueTransRecords = goodTransRecords.filter(lambda trans: Decimal(trans.split(",")[1]) > 1000)
    >>> badAmountLambda = lambda trans: Decimal(trans.split(",")[1]) <= 0
    >>> badAcNoLambda = lambda trans: (trans.split(",")[0]).startswith('SB') == False
    >>> badAmountRecords = acTransRDD.filter(badAmountLambda)
    >>> badAccountRecords = acTransRDD.filter(badAcNoLambda)
    >>> badTransRecords = badAmountRecords.union(badAccountRecords)
    >>> acTransRDD.collect()
    ['SB10001,1000', 'SB10002,1200', 'SB10003,8000', 'SB10004,400', 'SB10005,300', 'SB10006,10000', 'SB10007,500', 'SB10008,56', 'SB10009,30', 'SB10010,7000', 'CR10001,7000', 'SB10002,-10']
    >>> goodTransRecords.collect()
    ['SB10001,1000', 'SB10002,1200', 'SB10003,8000', 'SB10004,400', 'SB10005,300', 'SB10006,10000', 'SB10007,500', 'SB10008,56', 'SB10009,30', 'SB10010,7000']
    >>> highValueTransRecords.collect()
    ['SB10002,1200', 'SB10003,8000', 'SB10006,10000', 'SB10010,7000']
    >>> badAccountRecords.collect()
    ['CR10001,7000']
    >>> badAmountRecords.collect()
    ['SB10002,-10']
    >>> badTransRecords.collect()
    ['SB10002,-10', 'CR10001,7000']
    >>> sumAmounts = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a+b)
    >>> maxAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a if a > b else b)
    >>> sumAmounts
    Decimal('28486')
    >>> maxAmount
    Decimal('10000')
    >>> minAmount = goodTransRecords.map(lambda trans: Decimal(trans.split(",")[1])).reduce(lambda a,b : a if a < b else b)
    >>> minAmount
    Decimal('30')
    >>> combineAllElements = acTransRDD.flatMap(lambda trans: trans.split(","))
    >>> combineAllElements.collect()
    ['SB10001', '1000', 'SB10002', '1200', 'SB10003', '8000', 'SB10004', '400', 'SB10005', '300', 'SB10006', '10000', 'SB10007', '500', 'SB10008', '56', 'SB10009', '30', 'SB10010', '7000', 'CR10001', '7000', 'SB10002', '-10']
    >>> allGoodAccountNos = combineAllElements.filter(lambda trans: trans.startswith('SB') == True)
    >>> allGoodAccountNos.distinct().collect()
    ['SB10007', 'SB10010', 'SB10003', 'SB10006', 'SB10002', 'SB10005', 'SB10009', 'SB10001', 'SB10004', 'SB10008']
    >>> 
    
    image.png

    相关文章

      网友评论

          本文标题:004 pyspark对应于 scalaspark的实现(参见0

          本文链接:https://www.haomeiwen.com/subject/lnuowctx.html