本文用 pytorch 实现 LSTM 预测股票价格,这里只处理最高价格。直接上代码,较为简单,就不用文字描述了。主要思路为:用历史 7 填的数据来预测当天的最高价格,所以 LSTM 层的 inputsize 为 1,训练样本的 time steps 为 7。
功能较为简单,完整代码如下:
# coding: utf-8
# 目标:根据历史数据,预测当天股票最高价
# 模块导入
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
# ### 原始数据获取
import tushare as ts
cons = ts.get_apis()
df = ts.bar('000300', conn=cons, asset='INDEX', start_date='2002-01-01', end_date='')
# 注意历史数据靠前
df = df.sort_index(ascending=True)
df.to_csv('sh.csv')
# 数据预处理
df = pd.read_csv('sh.csv', index_col=0)
df.index = list(map(lambda x:datetime.datetime.strptime(x, '%Y-%m-%d'), df.index))
# 获取数据集
def getData(df, column, train_end=-300, days_before=30, return_all=True, generate_index=False):
'''
读取原始数据,并生成训练样本
df : 原始数据
column : 要处理的列
train_end : 训练集的终点
days_before : 多少天来预测下一天
return_all : 是否返回所有数据,默认 True
generate_index : 是否生成 index
'''
series = df[column].copy()
# 划分数据
# 0 ~ train_end 的为训练数据,但实际上,最后的 n 天只是作为 label
# 而 train 中的 label,可用于 test
train_series, test_series = series[:train_end], series[train_end - days_before:]
# 创建训练集
train_data = pd.DataFrame()
# 通过移位,创建历史 days_before 天的数据
for i in range(days_before):
# 当前数据的 7 天前的数据,应该取 开始到 7 天前的数据; 昨天的数据,应该为开始到昨天的数据,如:
# [..., 1,2,3,4,5,6,7] 昨天的为 [..., 1,2,3,4,5,6]
# 比如从 [2:-7+2],其长度为 len - 7
train_data['c%d' % i] = train_series.tolist()[i: -days_before + i]
# 获取对应的 label
train_data['y'] = train_series.tolist()[days_before:]
# 是否生成 index
if generate_index:
train_data.index = train_series.index[n:]
if return_all:
return train_data, series, df.index.tolist()
return train_data
# 创建 LSTM 层
class LSTM(nn.Module):
def __init__(self):
super(LSTM, self).__init__()
self.lstm = nn.LSTM(
input_size=1, # 输入尺寸为 1,表示一天的数据
hidden_size=64,
num_layers=1,
batch_first=True)
self.out = nn.Sequential(
nn.Linear(64,1))
def forward(self, x):
r_out, (h_n, h_c) = self.lstm(x, None) # None 表示 hidden state 会用全 0 的 state
out = self.out(r_out[:, -1, :]) # 取最后一天作为输出
return out
class TrainSet(Dataset):
def __init__(self, data):
# 定义好 image 的路径
# data 取前多少天的数据, label 取最后一天的数据
self.data, self.label = data[:, :-1].float(), data[:, -1].float()
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.data)
# 超参数
LR = 0.0001
EPOCH = 100
TRAIN_END=-300
DAYS_BEFORE=7
# 数据集建立
train_data, all_series, df_index = getData(df, 'high', days_before=DAYS_BEFORE, train_end=TRAIN_END)
# 获取所有原始数据
all_series = np.array(all_series.tolist())
# 绘制原始数据的图
plt.figure(figsize=(12,8))
plt.plot(df_index, all_series, label='real-data')
# 归一化,便与训练
train_data_numpy = np.array(train_data)
train_mean = np.mean(train_data_numpy)
train_std = np.std(train_data_numpy)
train_data_numpy = (train_data_numpy - train_mean) / train_std
train_data_tensor = torch.Tensor(train_data_numpy)
# 创建 dataloader
train_set = TrainSet(train_data_tensor)
train_loader = DataLoader(train_set, batch_size=10, shuffle=True)
# 模型训练部分
rnn = LSTM()
if torch.cuda.is_available():
rnn = rnn.cuda()
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters
loss_func = nn.MSELoss()
for step in range(EPOCH):
for tx, ty in train_loader:
if torch.cuda.is_available():
tx = tx.cuda()
ty = ty.cuda()
output = rnn(torch.unsqueeze(tx, dim=2))
loss = loss_func(torch.squeeze(output), ty)
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # back propagation, compute gradients
optimizer.step()
print(step, loss.cpu())
if step % 10:
torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')
# ## 验证
generate_data_train = []
generate_data_test = []
# 测试数据开始的索引
test_start = len(all_series) + TRAIN_END
# 对所有的数据进行相同的归一化
all_series = (all_series - train_mean) / train_std
all_series = torch.Tensor(all_series)
for i in range(DAYS_BEFORE, len(all_series)):
x = all_series[i - DAYS_BEFORE:i]
# 将 x 填充到 (bs, ts, is) 中的 timesteps
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=2)
if torch.cuda.is_available():
x = x.cuda()
y = rnn(x)
if i < test_start:
generate_data_train.append(torch.squeeze(y.cpu()).detach().numpy() * train_std + train_mean)
else:
generate_data_test.append(torch.squeeze(y.cpu()).detach().numpy() * train_std + train_mean)
# 使用全部数据验证
plt.figure(figsize=(12,8))
plt.plot(df_index[DAYS_BEFORE: TRAIN_END], generate_data_train, 'b', label='generate_train', )
plt.plot(df_index[TRAIN_END:], generate_data_test, 'k', label='generate_test')
plt.plot(df_index, all_series.clone().numpy()* train_std + train_mean, 'r', label='real_data')
plt.legend()
plt.show()
# 查看局部
plt.figure(figsize=(10,16))
plt.subplot(2,1,1)
plt.plot(df_index[100 + DAYS_BEFORE: 130 + DAYS_BEFORE], generate_data_train[100: 130], 'b', label='generate_train')
plt.plot(df_index[100 + DAYS_BEFORE: 130 + DAYS_BEFORE], (all_series.clone().numpy()* train_std + train_mean)[100 + DAYS_BEFORE: 130 + DAYS_BEFORE], 'r', label='real_data')
plt.legend()
plt.subplot(2,1,2)
plt.plot(df_index[TRAIN_END + 50: TRAIN_END + 80], generate_data_test[50:80], 'k', label='generate_test')
plt.plot(df_index[TRAIN_END + 50: TRAIN_END + 80], (all_series.clone().numpy()* train_std + train_mean)[TRAIN_END + 50: TRAIN_END + 80], 'r', label='real_data')
plt.legend()
plt.show()
最后全部数据的验证如下:

局部数据验证,可以看出,有部分延迟:

网友评论