任务:
1. 基础数据是Wikipedia Page Counts. (>1TB compressed, available since 2008)。
2. 字段包含
1) project (i.e. “en”, “fr”, etc, which is usually a language)
2) title of the page (uri), urlencoded
3) number of requests
4) size of the content returned
3. 文件名是日期+时间(详细到小时, 例如2008013101是2008年1月31日1点到2点的记录)
4. 目标是
1) 统计每天被请求最多的10个英文wikipedia页( top 10 pages by the number of requests per day in English Wikipedia)
2) 支持任意单词搜索,以完成"比较一下关于'myspace'的页面总请求数与关于'facebook'的页面总请求数"这样的任务。
单独用MySQL处理的流程如下
1. 解压缩文件(>10TB),把数据加载到临时表中,并根据文件名追加日期字段。
2. 聚合后插入最终表。
3. 解码title(使用UDF)
那么,整个 过程需要多长时间呢?
According to my calculations it should take > 1 month to do the whole pipeline for 6 years of data (this time does not include the uncompress time and does not include the load time depreciation as the table get bigger and bigger and indexes need to be updated). There are a lots of things we can do here to speed it up of course, i.e., load into different MySQL instances, load into MEMORY table first, then group by into InnoDB, etc.
比较快速的做法是使用pyspark操作,代码如下
(包含的功能是读文件,对url的访问次数按天累加,然后更新的MySQL)
The script took about an hour on 3 boxes to process 1 month of data and load theaggregated datato MySQL (single instance). We can estimate that to load all 6 years (aggregated) to MySQL is ~3 days.
因为使用了Spark RDD Transformation,处理时间从原来的1个月缩短到3天。
其他的性能改善还包括
1) group_res.write.jdbc(url=mysql_url, table=”wikistats.wikistats_by_day_spark”, mode=”append”)
Spark是在使用多线程的方式更新数据库。
2) group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)
这是在把sparkRDD存为parquet file (it can be saved to a directory to HDFS)。parquet file的作用是替代mysql做查询。从下面的截图可以看出,使用mysql查询10个访问最多的网页需要1小时22分钟,使用parquet file时需要20多分钟。
mysql parquet原帖地址:https://www.percona.com/blog/2015/10/07/using-apache-spark-mysql-data-analysis/
网友评论