美文网首页
2020年美国新冠肺炎疫情数据分析案例总结

2020年美国新冠肺炎疫情数据分析案例总结

作者: 胖波波玻璃球 | 来源:发表于2020-08-05 09:44 被阅读0次

    本案例出自于厦门大学数据库实验室,原采用的方法是PySpark, 在此基础之上,我们通过spark-sql、zeppelin及可视化的方式加以改进。

    一 数据集说明

    数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至今(2020-05-19)的相关数据。数据字典如下:

    字段名称          字段含义                             例子
    date        日期                            2020/1/21;2020/1/22;etc
    county      区县(州的下一级单位)               Snohomish;
    state       州                                  Washington
    cases       截止该日期该区县的累计确诊人数        1,2,3…
    deaths      截止该日期该区县的累计死亡人数        1,2,3…
    

    二 使用Zeppelin对数据进行分析

    1 导入数据并注册为临时表

    把数据存放到HDFS文件系统中,用到的方法是:

    ./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop
    

    这里,我们可以把文件放到本地(本地指Linux)或者上传到hdfs皆可,为了表现不同,我们从本地直接导入文件,代码如下:

    import org.apache.spark.sql.types._
    ## 定义schema
    val fields = Array(StructField("date", StringType,true), 
                       StructField("county", StringType,true), 
                       StructField("state", StringType,true), 
                       StructField("cases", IntegerType,true), 
                       StructField("deaths", IntegerType,true))      
    val schema = StructType(fields)
    val file = ""file:///home..." ## 本地使用file:///
    val df = spark.read.option("header", "true").schema(schema).csv(file)
    df.show(5)
    # 注册为临时表
    df.createOrReplaceTempView("usinfo")
    

    2 计算每日的累计确诊病例数和死亡数

    我们这里直接运用了spark-sql的方法,代码如下:

    import org.apache.spark.sql.functions._
    %sql
    select date, sum(cases) as `累积确诊`, sum(deaths) as `累积死亡`
    from usinfo
    group by date
    order by date asc
    

    结果导入HDFS数据库存储,方法如下:

    val df1 = result1.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
    df1.printSchema
    df1.repartition(1).write.json("file:///.../result1.json") 
    

    具体结果如下图:


    累积.png

    3 计算每日较昨日的新增确诊病例数和死亡病例数

    这里注意,我们需要把上面的结果再注册一下临时表,方便下面使用spark-sql,在sql中我们使用了自连接的方法,最后查询的结果保存了到HDFS上,如下:

    ## 注册为临时表供下一步使用
    df1.createOrReplaceTempView("ustotal")
    ## sql自连接
    val df2 = spark.sql("""
            select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease 
            from ustotal t1,ustotal t2 
            where t1.date = date_add(t2.date,1)
        """)
    # 再保存到HDFS
    df2.repartition(1).write.json("file:///.../result2.json"
    

    结果如下图:


    新增.png

    4 统计截止5.19日 美国各州的累计确诊人数和死亡人数

    操作类似:

    val df3 = spark.sql("""
                 select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate 
                 from usinfo  
                 where date = '2020-05-19'
                 group by date,state
               """)
    # 再保存到HDFS
    df3.repartition(1).write.json("file:///.../result3.json"
    

    展示如下:


    累计人数.png

    5 统计截止5.19全美各州的病死率

    val df4 = spark.sql("""
            select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate 
            from eachStateInfo 
            group by date 
            union 
            select 2 as sign,date,state,deathRate from eachStateInfo
        """)
    # 再保存到HDFS
    df4.repartition(1).write.json("file:///.../result4.json"
    

    展示结果:


    各州死亡率.png

    相关文章

      网友评论

          本文标题:2020年美国新冠肺炎疫情数据分析案例总结

          本文链接:https://www.haomeiwen.com/subject/peisrktx.html