1. 说明
数据处理时,可能会遇到数千万以及上亿条数据的情况。一次处理所有数据,会遇到内存不够,计算时间太长等问题。一般的解法是:先拆分,再处理,最后将处理的结果合并(当然数据少的时候不需要这么麻烦)。本文将介绍在单机上,只使用Python如何处理大量数据。
2. 实例
本例是天池大数据竞赛中的“淘宝穿衣搭配”比赛,这是一个新人赛,只要注册参赛,即可下载数据。目标是根据商品信息,专家推荐,用户购物信息,计算出最佳商品组合。
本例中处理的是用户购物信息“表1”:每条记录包含用户号uid,商品号mid,购物时间time。
uid,mid,time
4371603,8,20150418
8034236,8,20150516
6135829,8,20150405
需要统计每个用户都购买了什么物品,即生成“表2”:记录包含用户号uid,商品组合mids。
uid,mids
15 "1795974,1852545,98106,654166"
20 "2639977,79267"
赛题提供了千万级的购物数据,其中含有百万级的用户,全部load到内存再计算生成新的结构,虽然能运行,但内存占用让机器变得非常慢,普通计算只用到单CPU,我的机器用10个小时才处理了200多万条数据,优化之后半小时以内处理完所有数据。下面看看具体实现。
3. 切分数据
(1) 目标
把数据切分成十份,分别存入文件
(2) 代码
user = pd.read_csv("../../data/user_bought_history.txt", sep=" ")
user.columns = ['uid','mid','time']
dur = len(user)/10
ifrom = 0
idx = 0
while ifrom < len(user):
ito = ifrom + dur
data = user[ifrom:ito]
print("from ", ifrom, "to ", (ito-1), "total", len(data))
data.to_csv('../../data/user_bought_' + str(idx) + '.csv', index=False)
ifrom = ito
idx += 1
4. 处理数据
(1) 目标
用多线程方式处理切分后的数据,将表1转换成表2格式
(1) 代码
def do_conv(index):
path = "../../data/user_bought_" + str(index) + ".csv"
if not os.path.exists(path):
return
user = pd.read_csv(path)
user['mid']=user['mid'].astype(str)
grp=user.groupby('uid')
print(index, len(grp))
user_buy_count_data = pd.DataFrame(columns=['uid','mids'])
idx=0
arr_uid=[]
arr_mid=[]
for name, group in grp:
mids = ",".join(group['mid'])
arr_uid.append(name)
arr_mid.append(mids)
if idx % 10000 == 0:
show_info.show_time(str(index) + " : " + str(len(arr_uid)))
idx+=1
user_buy_count_data['uid']=arr_uid
user_buy_count_data['mids']=arr_mid
user_buy_count_data.to_csv("../../data/user_" + str(index) + ".csv", index=False)
if __name__ == '__main__':
param_list = range(0, 11) # 线程参数
pool = threadpool.ThreadPool(3) # 同时最多开3个线程
requests = threadpool.makeRequests(do_conv, param_list)
[pool.putRequest(req) for req in requests]
pool.wait() # 等待所有线程结束
(3) 技术点
i. 统一处理数据格式
从文件中读出的数据默认为int型,用astype函数将整个数据表的mid字段变为str型,相对于每次处理时再转换更节约时间。
ii. 使用groupby
groupby函数将数据按不同的uid划分为成多个表格,groupby还带有多种统计功能,相对于用字典方式统计数据效率高得多。
iii. 多线程
现在的机器都是多核的,能明显提高计算速度。python中提供了几种不同的多线程方式,这时使用了线程池,它可以控制线程的数量,以免本例中太多线程占用大量内存让机器变慢。使用之前需要安装threadpool库。
sudo pip install threadpool
5. 合并数据
(1) 目标
将转换完的数据合并,当同一个user在两个表中同时出现时,将mids累加在一起。
(2) 代码
def do_add(x):
if x.m == 'nan':
return x.mids
if x.mids == 'nan':
return x.m
return str(x.mids) + "," + str(x.m)
def do_merge(data, path):
if not os.path.exists(path):
return data
data.columns = ['uid','m']
ex = pd.read_csv(path)
print(len(data),len(ex))
data = pd.merge(data, ex, how='outer')
data['m']=data['m'].astype(str)
data['mids']=data['mids'].astype(str)
data['mids']=data.apply(do_add, axis=1)
data = data.drop('m',axis=1)
print(data.head())
return data
data = pd.DataFrame(columns=['uid','mids'])
for index in range(0, 11):
data = do_merge(data, "../../data/user_" + str(index) + ".csv")
show_info.show_time("")
print("after merge ", index, "len", len(data))
data.to_csv('../../data/user_all.csv',index=False)
6. 相关工具
(1) top命令
top是linux系统中统计系统资源占用的工具,默认为每秒统计一次,打开后按1键,可看到多核的占用情况。
7. 总结
在特征工程和算法的计算过程中,都可以使用先拆分再组合的方式,但前提是切分数据不会造成数据意义的变化。本文介绍了单机处理大数据的优化方式,下篇将介绍用Hadoop集群方案处理海量数据。
网友评论