美文网首页
pandas使用小结及时间转换及多进程池及多线程池

pandas使用小结及时间转换及多进程池及多线程池

作者: barriers | 来源:发表于2019-08-14 00:11 被阅读0次

    1去重

    1.1unique与drop_duplicates区别

    1.unique只能应用于series或者list,或者dataframe中的一列,不能作用于一整个dataframe,返回的是去重后的数据

    s = Series(['a','b','a','c','b'])
    s.unique()
    

    2.nunique用于统计dataframe中每列的不同值的个数,也可用于series,但不能用于list.返回的是每列不同值的个数.可与groupby结合,统计每个块的不同值的个数。

    data.nunique()
    data['time'].nunique()
    all_user_repay = all_user_repay.groupby(['user_id'])['listing_id'].agg(['nunique']).reset_index()
    

    3.drop_duplicates既可用于series也可用于dataframe

    data.duplicated() # 返回一个布尔型,表示各行是否重复
    s.drop_duplicates() # 会过滤掉重复的行
    df = DataFrame({'水果':['苹果','草莓','苹果'],
               '价格':[3,9,3],
               '数量':[5,6,5]})
    df.drop_duplicates()
    data.drop_duplicates(['k1']) # 对dataframe根据k1列过滤重复项
    

    2python中时间戳与dataframe的转换

    # datetime转时间戳
    time.mktime(time_data.timetuple())
    # 时间戳转datetime
    # 转utc时间
    timeStamp = 1381419600
    dateArray = datetime.datetime.utcfromtimestamp(timeStamp)
    # 转本地时间
    dateArray = datetime.datetime.fromtimestamp(timeStamp)
    # datetime转字符串
    now = datetime.datetime.now()
    otherStyleTime = now.strftime("%Y-%m-%d %H:%M:%S")
    # 字符串转datetime
    datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S")
    # pandas中字符串时间转换为datetime类型
    data['end_time'] = pd.to_datetime(data['end_time'], format="%Y-%m-%d %H:%M:%S")
    
    # 将字符型时间转日期型时间在转时间戳 然后对时间戳按每5分钟取整,然后时间戳转日期型时间
    data['DataTime'] = data['DataTime'].map(lambda x: datetime.datetime.fromtimestamp
                               ((time.mktime(datetime.datetime.strptime
                               (x, '%Y-%m-%d %H:%M:%S').
                                timetuple())//300)*300)
                                  + datetime.timedelta(minutes=5))
    

    3面元划分及据类

    3.1面元划分

    # 定义生成dataframe的函数
    def get_pd(num=10):
    data_list = []
    for _ in range(num):
        st_time = datetime.datetime(2019,7,randint(1,31), randint(0,23),randint(0,59),randint(0,59))
        en_time = st_time + datetime.timedelta(minutes=randint(10,59))
        data = [st_time,en_time,randint(100,500),randint(1000,2000)]
        data_list.append(data)
    df = pd.DataFrame(data=data_list,columns=['start','end','lng','lat'])
    return df
    
    # 进行面元划分    
    data = get_pd(100)
    data = data.sort_values('start') # 排序
    data.reset_index(drop=True,inplace=True) # 重设排序后的索引
    # 将时间从开始到结束按每15分钟划分一个间隔
    cu=pd.date_range(data['start'][0],data['start'][99],freq='15Min')
    # 用间隔对数据的start列进行划分
    qu = pd.cut(data.start, cu, right=False)
    # 统计每列的个数,结果会自动倒序对个数进行排序
    qu.value_counts()
    

    pd.date_range(start, end, freq) 生成一个时间段
    freq参数由英文(M D H Min 。。。)、英文数字结合。D表示一天,M表示一月如20D表示20天,5M表示5个月
    pd.bdate_range(end,periods,freq) 根据end时间点开始,以freq为单位,向前生成周期为period的时间序列
    pd.bdate_range(start,periods,freq) 根据start时间点开始,以freq为单位,向后生成周期为period的时间序列
    pd.bdate_range(end='20180101',periods=5,freq='D') # 向前5天,之间间隔一天
    pd.bdate_range(start='20180101',periods=5,freq='D')# 向后5天,之间间隔一天

    3.2聚类

    # datetime转时间戳
    # a = time.mktime(data['start'][99].timetuple())
    data['time'] = data['start'].map(lambda x:time.mktime(x.timetuple()))
    # 利用转换后的时间戳对900整除(即间隔15分钟)
    data['time'] = data['time'].map(lambda x:x//(60*15))
    # 对整个数据按处理后的时间(每隔15分钟)进行分组
    data.groupby(data['time']).count()
    # 对数据的lng列按照处理后的时间进行分组并对统计结果进行倒序排序
    data['lng'].groupby(data['time']).count().sort_values(ascending=False)
    

    4多进程与多线程

    4.1多线程

    # 导入线程池
    from multiprocessing.dummy import Pool as threadpool
    
    # 创建20个线程池
    pool = threadpool(20)
    for detail_data in data:
         # 使用异步形式启动线程池,并传入一个函数和参数
        pool.apply_async(get_detail_json_file, (detail_header,))
    # 关闭线程池
    pool.close()
    # 等待所有线程执行完毕,join必须位于close后
    pool.join()
    

    4.2多进程

    def job(x):
        return x*x
    def work(x):
        print('执行函数完毕')
    
    # 使用进程池
    from multiprocessing import Pool
    pool = Pool(10)
    for i in range(1,7):
        pool.apply_async(job,(i,))
    pool.close()
    pool.join()
    
    # 有返回值的多进程使用
    from multiprocessing import Pool
    pool = Pool(10)
    # res为返回值组成的列表
    res = pool.map(job, range(10))
    
    # 有返回值的多进程的使用
    from multiprocessing import Pool
    pool = Pool(os.cpu_count())
    result = []
    for i in range(3):
        result.append(pool.apply_async(job, (i, )))
    pool.close()
    pool.join()
    data = [i.get() for i in result]  # i.get()提取返回值
    

    多线程计算时,若不同线程直接进行数据交流,只需要把变量设置为全局即可。

    利用进程池中的Queue完成多进程中的通讯

    import multiprocessing
    def write(q):
        print("write启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
        for i in "python":
            q.put(i)
    def read(q):
        print("read启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
        for i in range(q.qsize()):
            print("read从Queue获取到消息:%s" % q.get(True))
    
    q = multiprocessing.Manager().Queue()
    po = multiprocessing.Pool()
    po.apply_async(write, args=(q,))
    po.apply_async(read, args=(q,))
    po.close()
    po.join()
    

    5条件筛选赋值

    从DF1中选取第A列值为b的行,使他们的第C列的值为D
    DF1.loc[DF1['A']=='b','C'] = 'D'

    a.loc[a['id']==1595,'row'] 
    a.loc[a['id']==1595,['row','col']]
    # 以下两句查找结果相同
    a[(a['row']==30)&(a['col']==1)]['id']
    a.loc[(a['row']==30)&(a['col']==1), 'id']
    # 以下写法会报错
    a.loc[(a['bottom_lng']<=lng<a['top_lng'])&(a['bottom_lat']<=lat<a['top_lat']), 'id']
    # 该写法正确,找出给定经纬度lng,lat在a这个dataframe中的哪个网格中
    a.loc[(a['bottom_lng']<=lng)&(lng<a['top_lng'])&(a['bottom_lat']<=lat)& (lat<a['top_lat']), 'id']
    

    5.1利用apply对多列进行函数操作(按行进行映射)

    apply默认是对列进行操作,故默认是axis=0,如果要通过行对多列操作要设置axis=1
    要对DataFrame的多个列同时进行运算,可以使用apply,例如col3 = col1 + 2 * col2

    # 例一
    df['col3'] = df.apply(lambda x: x['col1'] + 2 * x['col2'], axis=1)
    # 例二
    df = pd.DataFrame ({'a' : np.random.randn(6),
             'b' : ['foo', 'bar'] * 3,
             'c' : np.random.randn(6)})
    def my_test(a, b):
        return a + b
    df['Value'] = df.apply(lambda row: my_test(row['a'], row['c']), axis=1)
    

    例三,通过映射第一个dataframe(data)中的每行,挑选出start_lng和start_lat两列,去另外一个dataframe(json_data)中查找出符合要求的行并返回这一行的id列的数据

    data.apply(lambda x: json_data.loc[(json_data['bottom_lng']
        <=x['start_lng'])&(x['start_lng']<json_data['top_lng'])&
        (json_data['bottom_lat']<=x['start_lat'])&(x['start_lat']
        <json_data['top_lat']), 'id'], axis=1)
    
    # 利用numpy创建一个全为0的矩阵
    df = pd.DataFrame(data=np.zeros((743,8),dtype=np.int32), columns=['N','NE','E','SE','S','SW','W','NW'])
    

    6从多个文件中获取数据并合并成一个文件

    import datetime
    import pandas as pd
    path ='/home/glzt/桌面/滴滴数据/file'
    def get_all_data():
        data = None
        for file in os.listdir('/home/glzt/桌面/滴滴数据/file'):
            data = read_file(data, file)
        data = data.sort_values(by=['start_time', 'end_time'])
        print(data)
        print(data.iloc[0]['start_time'], data.iloc[-1]['start_time'])
        print(data.iloc[0]['end_time'], data.iloc[-1]['end_time'])
        print(len(data))
        return data
    
    def read_file(all_data, file_path):
        # file_path = '/home/glzt/桌面/滴滴数据/file/order_20161104'
        file_path = '/home/glzt/桌面/滴滴数据/file/' + file_path
        data = pd.read_csv(file_path)
        data.columns = ['order', 'start_time', 'end_time', 'start_lng', 'start_lat', 'end_lng', 'end_lat']
        del data['order']
        data['start_time'] = data['start_time'].map(lambda x:datetime.datetime.fromtimestamp(x))
        data['end_time'] = data['end_time'].map(lambda x: datetime.datetime.fromtimestamp(x))
        if all_data is not None:
            data = pd.concat([all_data, data])
        else:
            data = data
        return data
    

    7loc数据筛选

    grids = pd.read_sql(sql, engine)
    points = pd.DataFrame(data=points_list, columns=['lng', 'lat'])
    # 必须在后面接to_numpy(),否则结果是一个n*n的矩阵,而不是1*n的series
    # to_numpy()后接不接[0]看情况,如果有可能不是每个都有值,就不能接,否则会报错,如果每个都有值,就可以接
    points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy(), axis=1)
    print(points)
    points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy()[0], axis=1)
    print(points)
    

    8改列的名字及存为json文件

    data.sort_values(by=['col', 'row'], inplace=True)
    data['grid_id'] = data.apply(lambda x: 140*(x['col'] - 1) + x['row'], axis=1)
    data.rename(columns={'col': 'column'}, inplace=True)
    # orient选择records,则存为常见的列表套字典的数据形式
    data.to_json(os.path.join(DATA_DIR, 'grids.json'), orient='records')

    相关文章

      网友评论

          本文标题:pandas使用小结及时间转换及多进程池及多线程池

          本文链接:https://www.haomeiwen.com/subject/hmoijctx.html