基于Pearson相关的Kernel Ridge Regression实现

背景介绍

Thomas Yeo教授的实验室在研究脑-行为关系时,经常使用基于MATLAB实现的Kernel Ridge Regression (KRR)算法。该方法的特点是使用Pearson相关系数作为核函数。虽然在scikit-learn中也可以实现类似功能,但其在每次超参数选择或随机训练时都会重新计算核矩阵,导致计算时间显著增加。

本文将Yeo实验室的代码改写为Python简化版本,以便更好地适应个人项目需求。

注意:本实现未考虑HCP数据集中家庭或不同站点的影响因素。如需处理这些因素,请参考predictors项目中的实现。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
from sklearn.base import BaseEstimator, RegressorMixin
import numpy as np
from sklearn.model_selection import KFold
from joblib import Parallel, delayed
from sklearn.metrics import r2_score
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from scipy import stats


def cov_regress(train_covx, train_y, test_covx, test_y):
"""
协变量回归函数:对协变量进行标准化处理并回归,返回残差

参数:
train_covx: 训练集协变量
train_y: 训练集目标变量
test_covx: 测试集协变量
test_y: 测试集目标变量

返回:
train_resy: 训练集残差
test_resy: 测试集残差
"""
# 标准化协变量
scaler = StandardScaler()
train_scaler_covx = scaler.fit_transform(train_covx)

# 线性回归拟合
regressor = LinearRegression()
min_max_scaler = MinMaxScaler(feature_range=(np.min(train_y), np.max(train_y)))

# 计算训练集残差
predy = regressor.fit(train_scaler_covx, train_y).predict(train_scaler_covx)
resy = train_y - predy
train_resy = min_max_scaler.fit_transform(resy.reshape(-1, 1)).reshape(-1)

# 计算测试集残差
test_cov_scaled = scaler.transform(test_covx)
test_resy = test_y - regressor.predict(test_cov_scaled)
test_resy = min_max_scaler.transform(test_resy.reshape(-1, 1)).reshape(-1)

return train_resy, test_resy


def find_optimal_alpha(K, y, alpha_list, bias_flag, k, seed, n_jobs=-1):
"""
寻找最优正则化参数alpha

参数:
K: 核矩阵
y: 目标变量
alpha_list: 候选alpha值列表
bias_flag: 是否包含偏置项
k: 交叉验证折数
seed: 随机种子
n_jobs: 并行任务数

返回:
best_alpha: 最优alpha值
model: 使用最优alpha训练的模型
"""
kfold = KFold(n_splits=k, shuffle=True, random_state=seed)

def get_score(alpha):
scores = []
for train_index, test_index in kfold.split(range(len(y))):
model = CorrRidge(alpha=alpha, bias_flag=bias_flag)
model.fit(K[train_index][:, train_index], y[train_index])
predy = model.predict(K[test_index][:, train_index])
scores.append(r2_score(y[test_index], predy))
return {'alpha': alpha, 'score': np.mean(scores)}

# 并行计算不同alpha的得分
results = Parallel(n_jobs=n_jobs)(delayed(get_score)(alpha) for alpha in alpha_list)
results = sorted(results, key=lambda x: x['score'], reverse=True)
best_alpha = results[0]['alpha']

# 使用最优alpha训练最终模型
model = CorrRidge(alpha=best_alpha, bias_flag=bias_flag)
model.fit(K, y)

return best_alpha, model


def create_outloop_cv_index(sample_num, k, seed=None):
"""
创建外循环交叉验证索引

参数:
sample_num: 样本数量
k: 折数
seed: 随机种子

返回:
生成器,每次返回训练集和测试集索引
"""
outloop = KFold(n_splits=k, shuffle=True, random_state=seed)
for train_index, test_index in outloop.split(range(sample_num)):
yield train_index, test_index


def optim_corr_ridge(K, alpha, y, bias_flag):
"""
实现核岭回归的优化

参数:
K: 核矩阵
alpha: 正则化参数
y: 目标变量
bias_flag: 是否包含偏置项

返回:
a: 权重系数
bias: 偏置项
"""
# 添加正则化并计算 Ridge 回归系数
K_r = K + np.eye(K.shape[0]) * alpha
K_r_inv = np.linalg.inv(K_r)

if bias_flag:
ones = np.ones((K.shape[0], 1))
_x = np.matmul(ones.T, np.matmul(K_r_inv, ones))
_x_inv = np.linalg.inv(_x)
bias = np.matmul(_x_inv, ones.T) @ np.matmul(K_r_inv, y)
a = np.matmul(K_r_inv, (y - bias))
else:
bias = np.zeros((K.shape[0], 1))
a = np.matmul(K_r_inv, y)

return a, bias


class CorrRidge(RegressorMixin, BaseEstimator):
"""
使用Pearson相关作为核函数的岭回归
"""
def __init__(self, alpha=1, bias_flag=True):
"""
初始化

参数:
alpha: 正则化参数
bias_flag: 是否包含偏置项
"""
self.alpha = alpha
self.bias_flag = bias_flag

def fit(self, K, y):
"""
拟合模型

参数:
K: 核矩阵
y: 目标变量

返回:
self: 模型实例
"""
self.a, self.bias = optim_corr_ridge(K, self.alpha, y, self.bias_flag)
return self

def predict(self, K):
"""
预测

参数:
K: 测试样本与训练样本之间的核矩阵

返回:
预测值
"""
return np.matmul(K, self.a) + self.bias


def main(K, Y, cov, alpha_list, bias_flag, k, rng_seeds, subjects, n_jobs=-1, output_dir=None):
"""
主函数,执行完整的训练和评估流程

参数:
K: 核矩阵
Y: 目标变量
cov: 协变量
alpha_list: 候选alpha值列表
bias_flag: 是否包含偏置项
k: 交叉验证折数
rng_seeds: 随机种子数量
subjects: 受试者ID
n_jobs: 并行任务数
output_dir: 输出目录
"""
for seed in range(rng_seeds):
folds = []
for train_index, test_index in create_outloop_cv_index(len(Y), k, seed):
# 处理协变量(如果有)
if cov is not None:
train_y_res, test_y_res = cov_regress(cov[train_index], Y[train_index], cov[test_index], Y[test_index])
else:
train_y_res = Y[train_index]
test_y_res = Y[test_index]

# 寻找最优alpha并训练模型
best_alpha, model = find_optimal_alpha(K[train_index][:, train_index], train_y_res, alpha_list, bias_flag, k, seed, n_jobs)

# 预测并计算相关性
predy = model.predict(K[test_index][:, train_index])
r, p = stats.pearsonr(predy, test_y_res)

# 记录结果
folds.append({
'predy': predy,
'test_y': test_y_res,
'alpha': best_alpha,
'seed': seed,
'subject': subjects[test_index],
'r': r,
'p': p
})

print('Fold: {}, Seed: {}, Alpha: {}, R: {}, P: {}'.format(len(folds), seed, best_alpha, r, p))

# 保存结果(如果指定了输出目录)
if output_dir is not None:
np.savez(output_dir + '/fold_{}_seed_{}.npz'.format(k, seed), folds=folds)


if __name__ == '__main__':
# 测试代码
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from scipy.stats import pearsonr
import time

# 生成测试数据
X, y = make_regression(n_samples=1000, n_features=30000, noise=0.1)
cov = X[:, :2] # 使用前两个特征作为协变量
K = np.corrcoef(X) # 计算相关系数矩阵作为核矩阵
alpha_list = np.logspace(-3, 3, 10) # 候选alpha值

# 执行主函数并计时
s1 = time.time()
main(K, y, cov, alpha_list, True, 5, 10, np.arange(len(y)), n_jobs=-1)
s2 = time.time()
print('总耗时: {} 秒'.format(s2 - s1))

实现细节说明

  1. CorrRidge类:继承自scikit-learn的BaseEstimator和RegressorMixin,实现了使用Pearson相关作为核函数的岭回归。

  2. 协变量处理cov_regress函数用于移除协变量的影响,返回残差作为新的目标变量。

  3. 超参数优化find_optimal_alpha函数通过k折交叉验证找到最优的正则化参数alpha。

  4. 并行计算:使用joblib的Parallel和delayed进行并行计算,提高效率。

  5. 交叉验证:实现了嵌套交叉验证,外循环用于评估模型性能,内循环用于选择最优alpha。