美文网首页
操练代码之LSTM

操练代码之LSTM

作者: 万州客 | 来源:发表于2024-08-19 13:37 被阅读0次

得不到股票数据,接口过期了,先记录代码

一,代码

# 使用LSTM神经网络,对股票进行预测

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import numpy as np
import tushare as ts
from tqdm import tqdm

import torch.utils.data as Data
import matplotlib
import matplotlib.pyplot as plt

from copy import deepcopy as copy


# 获取数据
class GetData:
    def __init__(self, stock_id, save_path):
        self.stock_id = stock_id
        self.save_path = save_path
        self.data = None

    def get_data(self):
        self.data = ts.get_hist_data(self.stock_id).iloc[::-1]
        self.data = self.data[['open', 'close', 'high', 'low', 'volume']]
        self.close_min = self.data['volume'].min()
        self.close_max = self.data['volume'].max()
        self.data = self.data.apply(lambda x: (x-min(x))/(max(x)-min(x)))
        self.data.to_csv(self.save_path)

    def process_data(self, n):
        if self.data is None:
            self.get_data()
        feature = [
            self.data.iloc[i:i+n].values.tolist()
            for i in range(len(self.data) - n + 2)
            if i + n < len(self.data)
        ]
        label = [
            self.data.close.values[i + n]
            for i in range(len(self.data) - n + 2)
            if i + n < len(self.data)
        ]
        train_x = feature[:500]
        test_x = feature[500:]
        train_y = label[:500]
        test_y = label[500:]

        return train_x, test_x, train_y, test_y


# 搭建LSTM模型
class Model(nn.Module):
    def __init__(self, n):
        super(Model, self).__init__()
        self.lstm_layer = nn.LSTM(input_size=n, hidden_size=256, batch_first=True)
        self.linear_layer = nn.Linear(in_features=256, out_features=1, bias=True)

    def forward(self, x):
        out1, (h_n, h_c) = self.lstm_layer(x)
        a, b, c = h_n.shape
        out2 = self.linear_layer(h_n.reshape(a*b, c))
        return out2


# 训练模型,计算损失等
def train_module(epoch, train_data_loader, test_data_loader):
    best_model = None
    train_loss = 0
    test_loss = 0
    best_loss = 100
    epoch_cnt = 0
    for _ in range(epoch):
        total_train_loss = 0
        total_train_num = 0
        total_test_loss = 0
        total_test_num = 0
        for x, y in tqdm(train_data_loader, desc='Epoch: {}|Train Losss:{}|TestLoss:{}'.format(
            _, train_loss, test_loss
        )):
            x_num = len(x)
            p = model(x)
            loss = loss_func(p, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
            total_train_num += x_num

        train_loss = total_train_loss / total_train_num
        for x, y, in test_data_loader:
            x_num = len(x)
            p = model(x)
            loss = loss_func(p ,y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_test_loss += loss.item()
            total_test_num += x_num

        test_loss = total_test_loss / total_test_num

        if best_loss > test_loss:
            best_loss = test_loss
            best_model = copy(model)
            epoch_cnt = 0
        else:
            epoch_cnt -= 1

        if epoch_cnt > early_stop:
            torch.save(best_model.state_dict(), './lstm_.pth')
            break


# 测试模型
def test_model(test_data_loader_):
    pred = []
    label = []
    model_ = Model(5)
    model_.load_state_dict(torch.load('./lstm_.pth'))
    model_.eval()
    total_test_loss = 0
    total_test_num = 0

    for x, y in test_data_loader_:
        x_num = len(x)
        p = model_(x)
        loss = loss_func(p, y)
        total_test_loss += loss.item()
        total_test_num += x_num
        pred.extend(p.data.squeeze(1).tolist())
        label.extend(y.tolist())
    test_loss = total_test_loss / total_test_num
    return pred, label, test_loss

# 绘制折线图
def plot_img(data, pred):
    plt.rcParams['font.sans-serif'] = ['SimHei']
    plt.figure(figsize=(12, 7))
    plt.plot(range(len(pred)), pred, color='green')
    plt.plot(range(len(data)), data, color='blue')
    for i in range(0, len(pred)-3, 5):
        price = [data[i]+pred[j]-pred[i] for j in range(i, i+3)]
        plt.plot(range(i, i+3), price, color='red')
    plt.xticks(fontproperties='Times New Roman', size='15')
    plt.yticks(fontproperties='Times New Roman', size='15')
    plt.xlabel('日期', fontsize=18)
    plt.ylabel('日期', fontsize=18)
    plt.show()


if __name__ == '__main__':
    days_num = 5
    epoch = 20
    fea = 5
    batch_size = 20
    early_stop = 5
    model = Model(fea)
    GD = GetData(stock_id='000001', save_path='./data.csv')
    x_train, x_test, y_train, y_test = GD.process_data(days_num)
    x_train = torch.tensor(x_train).float()
    x_test = torch.tensor(x_test).float()
    y_train = torch.tensor(y_train).float()
    y_test = torch.tensor(y_test).float()

    train_data = TensorDataset(x_train, y_train)
    train_data_loader = DataLoader(train_data, batch_size=batch_size)
    test_data = TensorDataset(x_test, y_test)
    test_data_loader = DataLoader(test_data, batch_size=batch_size)

    loss_func = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    train_module(epoch, train_data_loader, test_data_loader)
    p, y, test_loss = test_model(test_data_loader)
    pred = [ele * (GD.close_max - GD.close_min) + GD.close_min for ele in p]
    data = [ele * (GD.close_max - GD.close_min) + GD.close_min for ele in y]
    plot_img(data, pred)
    print('模型损失:', test_loss)

相关文章

网友评论

      本文标题:操练代码之LSTM

      本文链接:https://www.haomeiwen.com/subject/lapmkjtx.html