美文网首页
Spark实现基础的数据统计

Spark实现基础的数据统计

作者: 一只小哈 | 来源:发表于2016-08-03 21:32 被阅读1588次

工作中,我们常常会遇到数据统计的需求,如果数据在DB里,那么写SQL即可。但是如果DB满足不了需求,那我们就需要统计日志,但是对于高度服务化的互联网应用,基本都是分布式的服务,日志基本也都是分布式的,这意味着Shell 和 Python统计的方式,弊端很多,首先数据聚合就是很大的问题。
所幸,公司有Hadoop平台,以前做日志的统计都是在Hadoop上写Mapreduce,但是后来发现,对于简单的日志统计和过滤,写一个Mapreduce成本确实很高,同时编程模型不是很灵活,实现一个并行或者多次迭代的场景的数据统计确实比较麻烦。所以果断投入了Spark的怀抱。
今天碰见一个需求,需要对日志中的用户注册渠道数据进行统计,因为需要统计很多天的数据,同时渠道也有很多,写MapReduce还是比较复杂,主要还是惧怕代码量和反复打包上传。
所以用Spark简单的实现了一个统计场景,难度不高于经典“Word Count”,写出来只为记录自己学习的点滴:

import pyspark as sp;
from pyspark import SparkConf;
import time;
def countReg():
    spConf = SparkConf()
    context = sp.SparkContext(appName="regCount")
    regFile = context.textFile("/home/xxx/xxx/xxx/*");
    firstFileterd = regFile.filter(lambda a: "reg_mobile" in a.encode("UTF-8") or "reg_email" in a.encode("UTF-8") or "reg_dynamic_mobile" in a.encode("UTF-8")).cache();

    filterMobileFile = firstFileterd.filter(lambda a: "reg_mobile" in a.encode("UTF-8") and a.encode("UTF-8").split("\001")[0]=="reg_mobile");
    filterEmailFile = firstFileterd.filter(lambda a: "reg_email" in a.encode("UTF-8") and a.encode("UTF-8").split("\001")[0]=="reg_email");
    dynamicMobileFile = firstFileterd.filter(lambda a: "reg_dynamic_mobile" in a.encode("UTF-8") and a.encode("UTF-8").split("\001")[3]=="null");

    mobileCollect = filterMobileFile.map(mapfunc).reduceByKey(lambda a,b:a+b).sortBy(lambda m:m[0]).collect()
    emailCollect = filterEmailFile.map(mapfunc).reduceByKey(lambda a,b:a+b).sortBy(lambda m:m[0]).collect()
    dynamicMobileCollect = dynamicMobileFile.map(mapfuncForDynamic).reduceByKey(lambda a,b:a+b).sortBy(lambda m:m[0]).collect()
    print("-------------------------------mobile---------------------------------->\n")
    for line in mobileCollect:
        print(line)
    print("--------------------------------email--------------------------------->\n")
    for line in emailCollect:
        print(line)
    print("---------------------------------dynamic_mobile-------------------------------->\n")
    for line in dynamicMobileCollect:
        print(line)

def mapfunc(line):
    timeStamp = line.encode("UTF-8").split("\001")[-1]
    if(timeStamp!='' and timeStamp!=None):
        timeStamp = long(timeStamp)/1000
        timeRaw = time.localtime(timeStamp)
        datestr = time.strftime("%Y-%m-%d",timeRaw)
        return (datestr,1)
    else:
        return ("error",1)

def mapfuncForDynamic(line):
    timeStamp = line.encode("UTF-8").split("\001")[-2]
    if(timeStamp!='' and timeStamp!=None):
        timeStamp = long(timeStamp)/1000
        timeRaw = time.localtime(timeStamp)
        datestr = time.strftime("%Y-%m-%d",timeRaw)
        return (datestr,1)
    else:
        return ("error",1)

if __name__=="__main__":
    countReg();

上述是对三个来源的注册数据进行统计,虽然不难,但是能说明Spark比Hadoop的好处:

  1. 代码简单,编程比较灵活
  2. 脚本开发,不用反复打包
  3. MaprReduce实现上述场景要么起三个Job,要么重写Partitioner,分配到不同的Reducer。Spark实现起来就比较简单了。

因为之前比较习惯Python开发,但是对于数据开发方向的话,Python还是比较不合适的,对各种存储支持的不是很好,对SparkStreaming支持的也不是很好。所以以后准备学习Scala去开发Spark,毕竟是Spark的构建语言。

相关文章

网友评论

      本文标题:Spark实现基础的数据统计

      本文链接:https://www.haomeiwen.com/subject/slbwjttx.html