1去重
1.1unique与drop_duplicates区别
1.unique只能应用于series或者list,或者dataframe中的一列,不能作用于一整个dataframe,返回的是去重后的数据
s = Series(['a','b','a','c','b'])
s.unique()
2.nunique用于统计dataframe中每列的不同值的个数,也可用于series,但不能用于list.返回的是每列不同值的个数.可与groupby结合,统计每个块的不同值的个数。
data.nunique()
data['time'].nunique()
all_user_repay = all_user_repay.groupby(['user_id'])['listing_id'].agg(['nunique']).reset_index()
3.drop_duplicates既可用于series也可用于dataframe
data.duplicated() # 返回一个布尔型,表示各行是否重复
s.drop_duplicates() # 会过滤掉重复的行
df = DataFrame({'水果':['苹果','草莓','苹果'],
'价格':[3,9,3],
'数量':[5,6,5]})
df.drop_duplicates()
data.drop_duplicates(['k1']) # 对dataframe根据k1列过滤重复项
2python中时间戳与dataframe的转换
# datetime转时间戳
time.mktime(time_data.timetuple())
# 时间戳转datetime
# 转utc时间
timeStamp = 1381419600
dateArray = datetime.datetime.utcfromtimestamp(timeStamp)
# 转本地时间
dateArray = datetime.datetime.fromtimestamp(timeStamp)
# datetime转字符串
now = datetime.datetime.now()
otherStyleTime = now.strftime("%Y-%m-%d %H:%M:%S")
# 字符串转datetime
datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S")
# pandas中字符串时间转换为datetime类型
data['end_time'] = pd.to_datetime(data['end_time'], format="%Y-%m-%d %H:%M:%S")
# 将字符型时间转日期型时间在转时间戳 然后对时间戳按每5分钟取整,然后时间戳转日期型时间
data['DataTime'] = data['DataTime'].map(lambda x: datetime.datetime.fromtimestamp
((time.mktime(datetime.datetime.strptime
(x, '%Y-%m-%d %H:%M:%S').
timetuple())//300)*300)
+ datetime.timedelta(minutes=5))
3面元划分及据类
3.1面元划分
# 定义生成dataframe的函数
def get_pd(num=10):
data_list = []
for _ in range(num):
st_time = datetime.datetime(2019,7,randint(1,31), randint(0,23),randint(0,59),randint(0,59))
en_time = st_time + datetime.timedelta(minutes=randint(10,59))
data = [st_time,en_time,randint(100,500),randint(1000,2000)]
data_list.append(data)
df = pd.DataFrame(data=data_list,columns=['start','end','lng','lat'])
return df
# 进行面元划分
data = get_pd(100)
data = data.sort_values('start') # 排序
data.reset_index(drop=True,inplace=True) # 重设排序后的索引
# 将时间从开始到结束按每15分钟划分一个间隔
cu=pd.date_range(data['start'][0],data['start'][99],freq='15Min')
# 用间隔对数据的start列进行划分
qu = pd.cut(data.start, cu, right=False)
# 统计每列的个数,结果会自动倒序对个数进行排序
qu.value_counts()
pd.date_range(start, end, freq) 生成一个时间段
freq参数由英文(M D H Min 。。。)、英文数字结合。D表示一天,M表示一月如20D表示20天,5M表示5个月
pd.bdate_range(end,periods,freq) 根据end时间点开始,以freq为单位,向前生成周期为period的时间序列
pd.bdate_range(start,periods,freq) 根据start时间点开始,以freq为单位,向后生成周期为period的时间序列
pd.bdate_range(end='20180101',periods=5,freq='D') # 向前5天,之间间隔一天
pd.bdate_range(start='20180101',periods=5,freq='D')# 向后5天,之间间隔一天
3.2聚类
# datetime转时间戳
# a = time.mktime(data['start'][99].timetuple())
data['time'] = data['start'].map(lambda x:time.mktime(x.timetuple()))
# 利用转换后的时间戳对900整除(即间隔15分钟)
data['time'] = data['time'].map(lambda x:x//(60*15))
# 对整个数据按处理后的时间(每隔15分钟)进行分组
data.groupby(data['time']).count()
# 对数据的lng列按照处理后的时间进行分组并对统计结果进行倒序排序
data['lng'].groupby(data['time']).count().sort_values(ascending=False)
4多进程与多线程
4.1多线程
# 导入线程池
from multiprocessing.dummy import Pool as threadpool
# 创建20个线程池
pool = threadpool(20)
for detail_data in data:
# 使用异步形式启动线程池,并传入一个函数和参数
pool.apply_async(get_detail_json_file, (detail_header,))
# 关闭线程池
pool.close()
# 等待所有线程执行完毕,join必须位于close后
pool.join()
4.2多进程
def job(x):
return x*x
def work(x):
print('执行函数完毕')
# 使用进程池
from multiprocessing import Pool
pool = Pool(10)
for i in range(1,7):
pool.apply_async(job,(i,))
pool.close()
pool.join()
# 有返回值的多进程使用
from multiprocessing import Pool
pool = Pool(10)
# res为返回值组成的列表
res = pool.map(job, range(10))
# 有返回值的多进程的使用
from multiprocessing import Pool
pool = Pool(os.cpu_count())
result = []
for i in range(3):
result.append(pool.apply_async(job, (i, )))
pool.close()
pool.join()
data = [i.get() for i in result] # i.get()提取返回值
多线程计算时,若不同线程直接进行数据交流,只需要把变量设置为全局即可。
利用进程池中的Queue完成多进程中的通讯
import multiprocessing
def write(q):
print("write启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in "python":
q.put(i)
def read(q):
print("read启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("read从Queue获取到消息:%s" % q.get(True))
q = multiprocessing.Manager().Queue()
po = multiprocessing.Pool()
po.apply_async(write, args=(q,))
po.apply_async(read, args=(q,))
po.close()
po.join()
5条件筛选赋值
从DF1中选取第A列值为b的行,使他们的第C列的值为D
DF1.loc[DF1['A']=='b','C'] = 'D'
a.loc[a['id']==1595,'row']
a.loc[a['id']==1595,['row','col']]
# 以下两句查找结果相同
a[(a['row']==30)&(a['col']==1)]['id']
a.loc[(a['row']==30)&(a['col']==1), 'id']
# 以下写法会报错
a.loc[(a['bottom_lng']<=lng<a['top_lng'])&(a['bottom_lat']<=lat<a['top_lat']), 'id']
# 该写法正确,找出给定经纬度lng,lat在a这个dataframe中的哪个网格中
a.loc[(a['bottom_lng']<=lng)&(lng<a['top_lng'])&(a['bottom_lat']<=lat)& (lat<a['top_lat']), 'id']
5.1利用apply对多列进行函数操作(按行进行映射)
apply默认是对列进行操作,故默认是axis=0,如果要通过行对多列操作要设置axis=1
要对DataFrame的多个列同时进行运算,可以使用apply,例如col3 = col1 + 2 * col2
# 例一
df['col3'] = df.apply(lambda x: x['col1'] + 2 * x['col2'], axis=1)
# 例二
df = pd.DataFrame ({'a' : np.random.randn(6),
'b' : ['foo', 'bar'] * 3,
'c' : np.random.randn(6)})
def my_test(a, b):
return a + b
df['Value'] = df.apply(lambda row: my_test(row['a'], row['c']), axis=1)
例三,通过映射第一个dataframe(data)中的每行,挑选出start_lng和start_lat两列,去另外一个dataframe(json_data)中查找出符合要求的行并返回这一行的id列的数据
data.apply(lambda x: json_data.loc[(json_data['bottom_lng']
<=x['start_lng'])&(x['start_lng']<json_data['top_lng'])&
(json_data['bottom_lat']<=x['start_lat'])&(x['start_lat']
<json_data['top_lat']), 'id'], axis=1)
# 利用numpy创建一个全为0的矩阵
df = pd.DataFrame(data=np.zeros((743,8),dtype=np.int32), columns=['N','NE','E','SE','S','SW','W','NW'])
6从多个文件中获取数据并合并成一个文件
import datetime
import pandas as pd
path ='/home/glzt/桌面/滴滴数据/file'
def get_all_data():
data = None
for file in os.listdir('/home/glzt/桌面/滴滴数据/file'):
data = read_file(data, file)
data = data.sort_values(by=['start_time', 'end_time'])
print(data)
print(data.iloc[0]['start_time'], data.iloc[-1]['start_time'])
print(data.iloc[0]['end_time'], data.iloc[-1]['end_time'])
print(len(data))
return data
def read_file(all_data, file_path):
# file_path = '/home/glzt/桌面/滴滴数据/file/order_20161104'
file_path = '/home/glzt/桌面/滴滴数据/file/' + file_path
data = pd.read_csv(file_path)
data.columns = ['order', 'start_time', 'end_time', 'start_lng', 'start_lat', 'end_lng', 'end_lat']
del data['order']
data['start_time'] = data['start_time'].map(lambda x:datetime.datetime.fromtimestamp(x))
data['end_time'] = data['end_time'].map(lambda x: datetime.datetime.fromtimestamp(x))
if all_data is not None:
data = pd.concat([all_data, data])
else:
data = data
return data
7loc数据筛选
grids = pd.read_sql(sql, engine)
points = pd.DataFrame(data=points_list, columns=['lng', 'lat'])
# 必须在后面接to_numpy(),否则结果是一个n*n的矩阵,而不是1*n的series
# to_numpy()后接不接[0]看情况,如果有可能不是每个都有值,就不能接,否则会报错,如果每个都有值,就可以接
points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy(), axis=1)
print(points)
points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy()[0], axis=1)
print(points)
8改列的名字及存为json文件
data.sort_values(by=['col', 'row'], inplace=True)
data['grid_id'] = data.apply(lambda x: 140*(x['col'] - 1) + x['row'], axis=1)
data.rename(columns={'col': 'column'}, inplace=True)
# orient选择records,则存为常见的列表套字典的数据形式
data.to_json(os.path.join(DATA_DIR, 'grids.json'), orient='records')
网友评论