业务上碰到的一个问题。
在Binance拿到的K线数据是残缺的,因为交易所升级、受攻击,暂停交易等等方面的原因,导致了交易所自身的数据都是残缺的。因此,必须要对K线数据进行数据清洗。
写了一个方法用来检查和填补缺失数据。
主要思路就是先用timestamp找出缺失的时间点broken,根据broken的时间点,一一的对缺失的时间段进行补充,开高低收都设为缺失前的收盘价,量和成交笔数之类的都设为0。值得注意的是,每次填充后df的列数都进行了改变,所以用broken之前的index是不靠谱的,必须要加上偏移量.
def fix_data(args, df_kline):
"""
数据清洗,补充缺失数据
:param args: {
'exchange': , 'coin': ,
'begin_date': ,
'end_date': ,
'interval':
}
:param df_kline: K线数据 DataFrame
:return: df 清洗好的数据 DataFrame
"""
delta_interval = { 'min_1': 1, 'min_5': 5, 'min_15': 15, 'min_30': 30, 'hour_1': 60, 'hour_2': 120, 'hour_4': 240, 'hour_6': 360, 'hour_8': 480, 'hour_12': 720, 'day_1': 1440, 'day_3': 4320, 'week_1': 10080,}
df = df_kline.copy()
def apply_func(x):
return (x[2] - x[1]) / (delta_interval[args['interval']] *60)
df['time_diff'] = df['timestamp'].rolling(window=3, center=True).apply(apply_func)
df['time_diff'].fillna(1, inplace=True)
broken = df[df['time_diff'] !=1]
offset =0
for index, rowin broken.iterrows():
above = df.loc[:index + offset]
below = df.loc[index + offset +1:]
tmp = pandas.DataFrame()
length =int(row['time_diff']) -1
offset += length
for _in range(length):
_row = row
_row['open'] = _row['close']
_row['high'] = _row['close']
_row['low'] = _row['close']
_row['number_of_trades'] =0
_row['quote_asset_volume'] =0
_timestamp =int(_row['timestamp'] + delta_interval[args['interval']] *60)
_row['timestamp'] = _timestamp
_row['time'] = datetime.fromtimestamp(_timestamp).strftime('%Y-%m-%d %H:%M:%S')
_row['volume'] =0
_row['time_diff'] =1
tmp = tmp.append(_row, ignore_index=True)
df = above.append(tmp, ignore_index=True).append(below, ignore_index=True)
df.drop(['time_diff'], axis=1, inplace=True)
return df
网友评论