In [3]:
Copied!
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
try:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
print(f'PyTorch 版本: {torch.__version__} ✅')
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'使用设备: {DEVICE}')
except ImportError:
print('⚠️ 请安装 PyTorch: pip install torch')
raise SystemExit('跳过本节,请先安装 torch')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
try:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
print(f'PyTorch 版本: {torch.__version__} ✅')
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'使用设备: {DEVICE}')
except ImportError:
print('⚠️ 请安装 PyTorch: pip install torch')
raise SystemExit('跳过本节,请先安装 torch')
PyTorch 版本: 2.5.1 ✅ 使用设备: cuda
In [4]:
Copied!
import matplotlib.pyplot as plt
# 1. 设置系统自带的中文字体(这里使用黑体 SimHei)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 如果你想用微软雅黑,可以改成 ['Microsoft YaHei']
# 2. 解决更换字体后,负号(-)显示为方块的问题
plt.rcParams['axes.unicode_minus'] = False
import matplotlib.pyplot as plt
# 1. 设置系统自带的中文字体(这里使用黑体 SimHei)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 如果你想用微软雅黑,可以改成 ['Microsoft YaHei']
# 2. 解决更换字体后,负号(-)显示为方块的问题
plt.rcParams['axes.unicode_minus'] = False
1. 为什么用 LSTM?¶
普通 RNN 存在梯度消失问题,难以捕捉长期依赖。LSTM 通过门机制解决这一问题:
输入门 → 决定哪些新信息进入记忆
遗忘门 → 决定丢弃哪些旧记忆
输出门 → 决定输出哪些隐藏状态
对于价格时序,LSTM 理论上能捕捉几周甚至几月的模式。但请记住:金融序列的信噪比极低,LSTM 容易过拟合!
2. 数据准备¶
In [5]:
Copied!
# 下载数据
raw = yf.download('SPY', start='2015-01-01', end='2024-01-01', progress=False)
close = raw['Close'].squeeze().values.reshape(-1, 1)
# 归一化到 [0, 1]
scaler = MinMaxScaler()
close_scaled = scaler.fit_transform(close)
def make_sequences(data, seq_len=60):
"""将时序数据转为 (X, y) 样本对
X: 过去 seq_len 天的价格序列
y: 下一天的价格
"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i: i + seq_len])
y.append(data[i + seq_len])
return np.array(X), np.array(y)
SEQ_LEN = 60 # 用过去60天预测
X, y = make_sequences(close_scaled, SEQ_LEN)
# 时序划分:80% 训练,20% 测试
split = int(len(X) * 0.8)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
# 转为 Tensor
X_train_t = torch.FloatTensor(X_train).to(DEVICE)
X_test_t = torch.FloatTensor(X_test).to(DEVICE)
y_train_t = torch.FloatTensor(y_train).to(DEVICE)
y_test_t = torch.FloatTensor(y_test).to(DEVICE)
train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
print(f'训练集: X={X_train.shape}, y={y_train.shape}')
print(f'测试集: X={X_test.shape}, y={y_test.shape}')
# 下载数据
raw = yf.download('SPY', start='2015-01-01', end='2024-01-01', progress=False)
close = raw['Close'].squeeze().values.reshape(-1, 1)
# 归一化到 [0, 1]
scaler = MinMaxScaler()
close_scaled = scaler.fit_transform(close)
def make_sequences(data, seq_len=60):
"""将时序数据转为 (X, y) 样本对
X: 过去 seq_len 天的价格序列
y: 下一天的价格
"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i: i + seq_len])
y.append(data[i + seq_len])
return np.array(X), np.array(y)
SEQ_LEN = 60 # 用过去60天预测
X, y = make_sequences(close_scaled, SEQ_LEN)
# 时序划分:80% 训练,20% 测试
split = int(len(X) * 0.8)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
# 转为 Tensor
X_train_t = torch.FloatTensor(X_train).to(DEVICE)
X_test_t = torch.FloatTensor(X_test).to(DEVICE)
y_train_t = torch.FloatTensor(y_train).to(DEVICE)
y_test_t = torch.FloatTensor(y_test).to(DEVICE)
train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
print(f'训练集: X={X_train.shape}, y={y_train.shape}')
print(f'测试集: X={X_test.shape}, y={y_test.shape}')
训练集: X=(1763, 60, 1), y=(1763, 1) 测试集: X=(441, 60, 1), y=(441, 1)
3. 构建 LSTM 模型¶
In [6]:
Copied!
class LSTMModel(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2,
output_size=1, dropout=0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# x: (batch, seq_len, input_size)
out, _ = self.lstm(x)
# 取最后一个时间步
out = self.dropout(out[:, -1, :])
return self.fc(out)
model = LSTMModel().to(DEVICE)
print(model)
total_params = sum(p.numel() for p in model.parameters())
print(f'\n总参数量: {total_params:,}')
class LSTMModel(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2,
output_size=1, dropout=0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# x: (batch, seq_len, input_size)
out, _ = self.lstm(x)
# 取最后一个时间步
out = self.dropout(out[:, -1, :])
return self.fc(out)
model = LSTMModel().to(DEVICE)
print(model)
total_params = sum(p.numel() for p in model.parameters())
print(f'\n总参数量: {total_params:,}')
LSTMModel( (lstm): LSTM(1, 64, num_layers=2, batch_first=True, dropout=0.2) (dropout): Dropout(p=0.2, inplace=False) (fc): Linear(in_features=64, out_features=1, bias=True) ) 总参数量: 50,497
4. 训练模型¶
In [7]:
Copied!
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
EPOCHS = 50
train_losses, val_losses = [], []
for epoch in range(EPOCHS):
# ---- 训练 ----
model.train()
batch_losses = []
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
pred = model(X_batch)
loss = criterion(pred, y_batch)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step()
batch_losses.append(loss.item())
train_loss = np.mean(batch_losses)
train_losses.append(train_loss)
# ---- 验证 ----
model.eval()
with torch.no_grad():
val_pred = model(X_test_t)
val_loss = criterion(val_pred, y_test_t).item()
val_losses.append(val_loss)
scheduler.step(val_loss)
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1:2d}/{EPOCHS}] '
f'Train Loss: {train_loss:.6f} '
f'Val Loss: {val_loss:.6f}')
# 绘制损失曲线
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(train_losses, label='训练损失', linewidth=1.5)
ax.plot(val_losses, label='验证损失', linewidth=1.5)
ax.set_title('LSTM 训练过程', fontsize=13)
ax.set_xlabel('Epoch')
ax.set_ylabel('MSE Loss')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
EPOCHS = 50
train_losses, val_losses = [], []
for epoch in range(EPOCHS):
# ---- 训练 ----
model.train()
batch_losses = []
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
pred = model(X_batch)
loss = criterion(pred, y_batch)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step()
batch_losses.append(loss.item())
train_loss = np.mean(batch_losses)
train_losses.append(train_loss)
# ---- 验证 ----
model.eval()
with torch.no_grad():
val_pred = model(X_test_t)
val_loss = criterion(val_pred, y_test_t).item()
val_losses.append(val_loss)
scheduler.step(val_loss)
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1:2d}/{EPOCHS}] '
f'Train Loss: {train_loss:.6f} '
f'Val Loss: {val_loss:.6f}')
# 绘制损失曲线
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(train_losses, label='训练损失', linewidth=1.5)
ax.plot(val_losses, label='验证损失', linewidth=1.5)
ax.set_title('LSTM 训练过程', fontsize=13)
ax.set_xlabel('Epoch')
ax.set_ylabel('MSE Loss')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
Epoch [10/50] Train Loss: 0.004590 Val Loss: 0.011013 Epoch [20/50] Train Loss: 0.002400 Val Loss: 0.000541 Epoch [30/50] Train Loss: 0.001631 Val Loss: 0.000501 Epoch [40/50] Train Loss: 0.001542 Val Loss: 0.000744 Epoch [50/50] Train Loss: 0.001455 Val Loss: 0.000522
5. 预测与可视化¶
In [8]:
Copied!
model.eval()
with torch.no_grad():
y_pred_scaled = model(X_test_t).cpu().numpy()
# 反归一化
y_pred_actual = scaler.inverse_transform(y_pred_scaled)
y_test_actual = scaler.inverse_transform(y_test)
# 对应日期
test_dates = raw.index[SEQ_LEN + split: SEQ_LEN + split + len(y_test)]
fig, ax = plt.subplots(figsize=(13, 5))
ax.plot(test_dates, y_test_actual, label='真实价格', linewidth=1.5, color='steelblue')
ax.plot(test_dates, y_pred_actual, label='LSTM 预测', linewidth=1.5,
color='orange', linestyle='--')
ax.set_title('LSTM 预测 vs 真实价格(测试集)', fontsize=13)
ax.set_ylabel('SPY 价格 (USD)')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# RMSE
rmse = np.sqrt(np.mean((y_pred_actual - y_test_actual) ** 2))
print(f'RMSE: {rmse:.4f} USD')
print('⚠️ 注意:价格预测看起来很好,但实际交易更关注方向准确率!')
model.eval()
with torch.no_grad():
y_pred_scaled = model(X_test_t).cpu().numpy()
# 反归一化
y_pred_actual = scaler.inverse_transform(y_pred_scaled)
y_test_actual = scaler.inverse_transform(y_test)
# 对应日期
test_dates = raw.index[SEQ_LEN + split: SEQ_LEN + split + len(y_test)]
fig, ax = plt.subplots(figsize=(13, 5))
ax.plot(test_dates, y_test_actual, label='真实价格', linewidth=1.5, color='steelblue')
ax.plot(test_dates, y_pred_actual, label='LSTM 预测', linewidth=1.5,
color='orange', linestyle='--')
ax.set_title('LSTM 预测 vs 真实价格(测试集)', fontsize=13)
ax.set_ylabel('SPY 价格 (USD)')
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# RMSE
rmse = np.sqrt(np.mean((y_pred_actual - y_test_actual) ** 2))
print(f'RMSE: {rmse:.4f} USD')
print('⚠️ 注意:价格预测看起来很好,但实际交易更关注方向准确率!')
RMSE: 7.0842 USD ⚠️ 注意:价格预测看起来很好,但实际交易更关注方向准确率!
6. 方向准确率(更实用的指标)¶
In [9]:
Copied!
# 真实方向 vs 预测方向
actual_dir = np.sign(np.diff(y_test_actual.ravel()))
pred_dir = np.sign(np.diff(y_pred_actual.ravel()))
dir_accuracy = (actual_dir == pred_dir).mean()
print(f'方向准确率: {dir_accuracy:.2%}')
print(f'基准(随机猜测): 50%')
if dir_accuracy > 0.52:
print('\n🟡 模型有微弱的方向预测能力,但需要在更多资产和时段验证!')
else:
print('\n🔴 方向准确率接近随机,模型没有实用的交易价值。')
print('这是金融深度学习中极为常见的结果——请不要轻信高价格预测精度!')
# 真实方向 vs 预测方向
actual_dir = np.sign(np.diff(y_test_actual.ravel()))
pred_dir = np.sign(np.diff(y_pred_actual.ravel()))
dir_accuracy = (actual_dir == pred_dir).mean()
print(f'方向准确率: {dir_accuracy:.2%}')
print(f'基准(随机猜测): 50%')
if dir_accuracy > 0.52:
print('\n🟡 模型有微弱的方向预测能力,但需要在更多资产和时段验证!')
else:
print('\n🔴 方向准确率接近随机,模型没有实用的交易价值。')
print('这是金融深度学习中极为常见的结果——请不要轻信高价格预测精度!')
方向准确率: 50.45% 基准(随机猜测): 50% 🔴 方向准确率接近随机,模型没有实用的交易价值。 这是金融深度学习中极为常见的结果——请不要轻信高价格预测精度!
🎯 练习¶
- 将 LSTM 的输入特征从「仅价格」扩展到「价格 + RSI + MACD + 成交量」,对比模型预测效果是否改善。
- 尝试用 Transformer(如
torch.nn.Transformer)替换 LSTM 层,观察训练速度和效果的差异。 - 实施 Walk-Forward Validation:将数据分为多段,每段分别训练并在下一段测试,汇总整体夏普比率。
- 将
SEQ_LEN从 60 改为 20 / 120,观察预测精度的变化。
下一节 → 05_cross_validation.ipynb
In [ ]:
Copied!