In [1]:
Copied!
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
from scipy import stats
plt.rcParams['figure.figsize'] = (12, 5)
print('库加载完成 ✅')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
from scipy import stats
plt.rcParams['figure.figsize'] = (12, 5)
print('库加载完成 ✅')
库加载完成 ✅
In [2]:
Copied!
import matplotlib.pyplot as plt
# 1. 设置系统自带的中文字体(这里使用黑体 SimHei)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 如果你想用微软雅黑,可以改成 ['Microsoft YaHei']
# 2. 解决更换字体后,负号(-)显示为方块的问题
plt.rcParams['axes.unicode_minus'] = False
import matplotlib.pyplot as plt
# 1. 设置系统自带的中文字体(这里使用黑体 SimHei)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 如果你想用微软雅黑,可以改成 ['Microsoft YaHei']
# 2. 解决更换字体后,负号(-)显示为方块的问题
plt.rcParams['axes.unicode_minus'] = False
1. 量化因子分类¶
| 类型 | 示例因子 | 说明 |
|---|---|---|
| 动量因子 | 20日/60日收益率 | 强者恒强 |
| 反转因子 | 5日/短期收益率 | 短期超跌反弹 |
| 波动率因子 | 历史波动率、ATR | 低波动溢价 |
| 技术因子 | RSI、MACD、布林带 | 技术分析信号 |
| 成交量因子 | 量比、换手率 | 资金流向 |
| 基本面因子 | PE、PB、ROE | 价值/质量 |
本节重点介绍技术因子的构建与筛选。
2. 构建因子库¶
In [3]:
Copied!
# 下载多只股票
tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA',
'JPM', 'BAC', 'XOM', 'JNJ', 'PG']
prices = yf.download(tickers, start='2018-01-01', end='2024-01-01',
progress=False)['Close'].dropna()
volume = yf.download(tickers, start='2018-01-01', end='2024-01-01',
progress=False)['Volume'].dropna()
# 为每只股票构建因子面板
all_factors = []
for ticker in tickers:
p = prices[ticker].dropna()
v = volume[ticker].dropna()
df = pd.DataFrame(index=p.index)
# 动量 / 反转
df['mom_5d'] = p.pct_change(5)
df['mom_20d'] = p.pct_change(20)
df['mom_60d'] = p.pct_change(60)
# 波动率
df['vol_20d'] = p.pct_change().rolling(20).std() * np.sqrt(252)
# RSI
delta = p.diff()
gain = delta.clip(lower=0).ewm(com=13).mean()
loss = (-delta.clip(upper=0)).ewm(com=13).mean()
df['rsi_14'] = 100 - 100 / (1 + gain / loss)
# 布林带位置
ma20 = p.rolling(20).mean()
std20 = p.rolling(20).std()
df['bb_pct'] = (p - (ma20 - 2 * std20)) / (4 * std20) # 在带内的位置 [0, 1]
# 成交量比
df['vol_ratio'] = v / v.rolling(20).mean()
df['price_ma_ratio'] = p / p.rolling(20).mean() - 1
# 目标变量(下期20日收益)
df['fwd_ret_20d'] = p.pct_change(20).shift(-20)
df['ticker'] = ticker
all_factors.append(df)
factors = pd.concat(all_factors).dropna()
feature_cols = ['mom_5d', 'mom_20d', 'mom_60d', 'vol_20d', 'rsi_14',
'bb_pct', 'vol_ratio', 'price_ma_ratio']
print(f'因子面板形状: {factors.shape}')
print(f'样本数: {len(factors)}')
# 下载多只股票
tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA',
'JPM', 'BAC', 'XOM', 'JNJ', 'PG']
prices = yf.download(tickers, start='2018-01-01', end='2024-01-01',
progress=False)['Close'].dropna()
volume = yf.download(tickers, start='2018-01-01', end='2024-01-01',
progress=False)['Volume'].dropna()
# 为每只股票构建因子面板
all_factors = []
for ticker in tickers:
p = prices[ticker].dropna()
v = volume[ticker].dropna()
df = pd.DataFrame(index=p.index)
# 动量 / 反转
df['mom_5d'] = p.pct_change(5)
df['mom_20d'] = p.pct_change(20)
df['mom_60d'] = p.pct_change(60)
# 波动率
df['vol_20d'] = p.pct_change().rolling(20).std() * np.sqrt(252)
# RSI
delta = p.diff()
gain = delta.clip(lower=0).ewm(com=13).mean()
loss = (-delta.clip(upper=0)).ewm(com=13).mean()
df['rsi_14'] = 100 - 100 / (1 + gain / loss)
# 布林带位置
ma20 = p.rolling(20).mean()
std20 = p.rolling(20).std()
df['bb_pct'] = (p - (ma20 - 2 * std20)) / (4 * std20) # 在带内的位置 [0, 1]
# 成交量比
df['vol_ratio'] = v / v.rolling(20).mean()
df['price_ma_ratio'] = p / p.rolling(20).mean() - 1
# 目标变量(下期20日收益)
df['fwd_ret_20d'] = p.pct_change(20).shift(-20)
df['ticker'] = ticker
all_factors.append(df)
factors = pd.concat(all_factors).dropna()
feature_cols = ['mom_5d', 'mom_20d', 'mom_60d', 'vol_20d', 'rsi_14',
'bb_pct', 'vol_ratio', 'price_ma_ratio']
print(f'因子面板形状: {factors.shape}')
print(f'样本数: {len(factors)}')
因子面板形状: (14290, 10) 样本数: 14290
3. 信息系数 IC(Information Coefficient)¶
$$IC = \text{Spearman}\left(\text{因子值}_t,\ \text{下期收益}_{t+n}\right)$$
- $|IC| > 0.05$:因子有实用价值
- 月度 IC 序列:ICIR = mean(IC) / std(IC) > 0.5 为稳定因子
In [4]:
Copied!
def compute_ic(factors_df, feature_cols, target='fwd_ret_20d', freq='ME'):
"""按时间截面计算 Spearman IC"""
ic_records = []
for date, group in factors_df.groupby(pd.Grouper(freq=freq, level=0)):
if len(group) < 5:
continue
row = {'date': date}
for col in feature_cols:
if group[col].std() == 0:
row[col] = np.nan
continue
ic_val, _ = stats.spearmanr(group[col].rank(), group[target].rank())
row[col] = ic_val
ic_records.append(row)
return pd.DataFrame(ic_records).set_index('date')
ic_df = compute_ic(factors, feature_cols)
ic_summary = pd.DataFrame({
'Mean IC': ic_df.mean(),
'Std IC': ic_df.std(),
'ICIR': ic_df.mean() / ic_df.std(),
'|IC| > 0.05 比例': (ic_df.abs() > 0.05).mean()
}).round(3)
# 按 |Mean IC| 排序
ic_summary = ic_summary.reindex(ic_summary['Mean IC'].abs().sort_values(ascending=False).index)
print('因子 IC 汇总(按 |Mean IC| 排序):')
print(ic_summary.to_string())
def compute_ic(factors_df, feature_cols, target='fwd_ret_20d', freq='ME'):
"""按时间截面计算 Spearman IC"""
ic_records = []
for date, group in factors_df.groupby(pd.Grouper(freq=freq, level=0)):
if len(group) < 5:
continue
row = {'date': date}
for col in feature_cols:
if group[col].std() == 0:
row[col] = np.nan
continue
ic_val, _ = stats.spearmanr(group[col].rank(), group[target].rank())
row[col] = ic_val
ic_records.append(row)
return pd.DataFrame(ic_records).set_index('date')
ic_df = compute_ic(factors, feature_cols)
ic_summary = pd.DataFrame({
'Mean IC': ic_df.mean(),
'Std IC': ic_df.std(),
'ICIR': ic_df.mean() / ic_df.std(),
'|IC| > 0.05 比例': (ic_df.abs() > 0.05).mean()
}).round(3)
# 按 |Mean IC| 排序
ic_summary = ic_summary.reindex(ic_summary['Mean IC'].abs().sort_values(ascending=False).index)
print('因子 IC 汇总(按 |Mean IC| 排序):')
print(ic_summary.to_string())
因子 IC 汇总(按 |Mean IC| 排序):
Mean IC Std IC ICIR |IC| > 0.05 比例
rsi_14 -0.119 0.245 -0.485 0.841
price_ma_ratio -0.118 0.248 -0.475 0.826
bb_pct -0.115 0.221 -0.523 0.812
vol_20d 0.098 0.298 0.328 0.913
mom_20d -0.093 0.259 -0.360 0.884
mom_5d -0.088 0.205 -0.429 0.768
mom_60d -0.040 0.301 -0.131 0.884
vol_ratio 0.006 0.170 0.035 0.754
In [5]:
Copied!
# IC 随时间变化
fig, ax = plt.subplots(figsize=(13, 5))
top_factors = ic_summary.head(4).index.tolist()
for col in top_factors:
ic_cumsum = ic_df[col].fillna(0).cumsum()
ax.plot(ic_df.index, ic_cumsum.values, label=col, linewidth=1.5)
ax.axhline(0, color='black', linewidth=0.8)
ax.set_title('Top 因子 IC 累积和(持续上升=稳定正向因子)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# IC 随时间变化
fig, ax = plt.subplots(figsize=(13, 5))
top_factors = ic_summary.head(4).index.tolist()
for col in top_factors:
ic_cumsum = ic_df[col].fillna(0).cumsum()
ax.plot(ic_df.index, ic_cumsum.values, label=col, linewidth=1.5)
ax.axhline(0, color='black', linewidth=0.8)
ax.set_title('Top 因子 IC 累积和(持续上升=稳定正向因子)', fontsize=13)
ax.legend()
ax.grid(alpha=0.3)
plt.tight_layout()
plt.show()
4. 因子相关性与多重共线性¶
In [6]:
Copied!
factor_corr = factors[feature_cols].corr()
fig, ax = plt.subplots(figsize=(8, 6))
mask = np.triu(np.ones_like(factor_corr, dtype=bool), k=1)
sns.heatmap(factor_corr, annot=True, fmt='.2f', cmap='coolwarm',
center=0, square=True, linewidths=0.5, ax=ax, mask=mask)
ax.set_title('因子相关性矩阵', fontsize=13)
plt.tight_layout()
plt.show()
# 高相关因子对
high_corr = []
for i, c1 in enumerate(feature_cols):
for j, c2 in enumerate(feature_cols):
if i < j and abs(factor_corr.loc[c1, c2]) > 0.6:
high_corr.append((c1, c2, factor_corr.loc[c1, c2]))
if high_corr:
print('⚠️ 高度相关的因子对(|corr| > 0.6,可能冗余):')
for c1, c2, corr in sorted(high_corr, key=lambda x: -abs(x[2])):
print(f' {c1} ↔ {c2}: {corr:.2f}')
else:
print('✅ 无高度相关因子对')
factor_corr = factors[feature_cols].corr()
fig, ax = plt.subplots(figsize=(8, 6))
mask = np.triu(np.ones_like(factor_corr, dtype=bool), k=1)
sns.heatmap(factor_corr, annot=True, fmt='.2f', cmap='coolwarm',
center=0, square=True, linewidths=0.5, ax=ax, mask=mask)
ax.set_title('因子相关性矩阵', fontsize=13)
plt.tight_layout()
plt.show()
# 高相关因子对
high_corr = []
for i, c1 in enumerate(feature_cols):
for j, c2 in enumerate(feature_cols):
if i < j and abs(factor_corr.loc[c1, c2]) > 0.6:
high_corr.append((c1, c2, factor_corr.loc[c1, c2]))
if high_corr:
print('⚠️ 高度相关的因子对(|corr| > 0.6,可能冗余):')
for c1, c2, corr in sorted(high_corr, key=lambda x: -abs(x[2])):
print(f' {c1} ↔ {c2}: {corr:.2f}')
else:
print('✅ 无高度相关因子对')
⚠️ 高度相关的因子对(|corr| > 0.6,可能冗余): rsi_14 ↔ bb_pct: 0.89 mom_20d ↔ price_ma_ratio: 0.85 rsi_14 ↔ price_ma_ratio: 0.81 bb_pct ↔ price_ma_ratio: 0.80 mom_5d ↔ price_ma_ratio: 0.76 mom_20d ↔ rsi_14: 0.75 mom_5d ↔ bb_pct: 0.68 mom_20d ↔ bb_pct: 0.62 mom_5d ↔ rsi_14: 0.61
5. 因子分组回测(分层测试)¶
将股票按因子值分成 5 组,看各组的平均下期收益是否单调。
In [7]:
Copied!
best_factor = ic_summary.index[0] # 取 IC 最大的因子
# 按月截面分层
quintile_returns = []
for date, group in factors.groupby(pd.Grouper(freq='ME', level=0)):
if len(group) < 5:
continue
group = group.copy()
group['quintile'] = pd.qcut(group[best_factor], 5, labels=[1, 2, 3, 4, 5])
for q, qgroup in group.groupby('quintile', observed=True):
quintile_returns.append({'date': date, 'quintile': int(q),
'avg_fwd_ret': qgroup['fwd_ret_20d'].mean()})
qdf = pd.DataFrame(quintile_returns)
group_means = qdf.groupby('quintile')['avg_fwd_ret'].mean()
fig, ax = plt.subplots(figsize=(7, 4))
colors = ['red', 'salmon', 'gray', 'lightgreen', 'green']
bars = ax.bar(group_means.index, group_means.values * 100, color=colors, alpha=0.85)
ax.axhline(0, color='black', linewidth=0.8)
ax.set_xlabel('因子分组(1=最低,5=最高)')
ax.set_ylabel('平均20日远期收益率 (%)')
ax.set_title(f'因子分层测试: {best_factor}', fontsize=13)
for bar, val in zip(bars, group_means.values):
ax.text(bar.get_x() + bar.get_width() / 2,
bar.get_height() + 0.01, f'{val:.2%}', ha='center', fontsize=10)
plt.tight_layout()
plt.show()
print('收益单调性越强,因子选股能力越稳定!')
best_factor = ic_summary.index[0] # 取 IC 最大的因子
# 按月截面分层
quintile_returns = []
for date, group in factors.groupby(pd.Grouper(freq='ME', level=0)):
if len(group) < 5:
continue
group = group.copy()
group['quintile'] = pd.qcut(group[best_factor], 5, labels=[1, 2, 3, 4, 5])
for q, qgroup in group.groupby('quintile', observed=True):
quintile_returns.append({'date': date, 'quintile': int(q),
'avg_fwd_ret': qgroup['fwd_ret_20d'].mean()})
qdf = pd.DataFrame(quintile_returns)
group_means = qdf.groupby('quintile')['avg_fwd_ret'].mean()
fig, ax = plt.subplots(figsize=(7, 4))
colors = ['red', 'salmon', 'gray', 'lightgreen', 'green']
bars = ax.bar(group_means.index, group_means.values * 100, color=colors, alpha=0.85)
ax.axhline(0, color='black', linewidth=0.8)
ax.set_xlabel('因子分组(1=最低,5=最高)')
ax.set_ylabel('平均20日远期收益率 (%)')
ax.set_title(f'因子分层测试: {best_factor}', fontsize=13)
for bar, val in zip(bars, group_means.values):
ax.text(bar.get_x() + bar.get_width() / 2,
bar.get_height() + 0.01, f'{val:.2%}', ha='center', fontsize=10)
plt.tight_layout()
plt.show()
print('收益单调性越强,因子选股能力越稳定!')
收益单调性越强,因子选股能力越稳定!
🎯 练习¶
- 计算「价格/52周高点」因子的 IC,与
mom_60d比较。 - 增加更多股票(如标普500全部成分股),IC 结果会更稳定吗?
- 用分层测试检验「低波动率因子」:波动率最低组的股票表现是否优于高波动率组?
下一节 → 02_ml_prediction.ipynb
In [ ]:
Copied!