利用梯度下降求解线性回归

原理

线性回归详解

线性回归公式推导

代码

LinearRegression.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import numpy as np

class LinearRegression:
def __init__(self):
self.b = None # 偏置
self.W = None # 权重
self.b_W = None # b_W = [b, W]

def fit(self, x_trian, y_train):
"""数值化直接求出参数"""
X = np.hstack([np.ones((len(x_trian), 1)), x_trian]) # 完全向量化运算,x_train 第一列添加为1
self.b_W = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(y_train)
self.b = self.b_W[0]
self.W = self.b_W[1:]

def fit_sgd(self, x_trian, y_train, epoch=50, t0=5, t1=50):
"""
随机梯度下降
:param t0 t1: lr = t0 / (iter + t1) ,iter 为迭代次数
"""

def lr(t):
return t0 / (t + t1)

def dJ_sgd(b_W, X_i, y_train_i):
""" 求梯度 """
return X_i * (X_i.dot(b_W) - y_train_i)

def sgd(X, y_train, init_b_W, epoch):
""" 随机梯度下降 """
b_W = init_b_W
m = len(X)
for iter in range(epoch):
indexes = np.random.permutation(m) # 打乱顺序
X = X[indexes, :]
y_train = y_train[indexes]
for i in range(m):
gradient = dJ_sgd(b_W, X[i], y_train[i])
b_W -= lr(iter * m + i) * gradient

return b_W


X = np.hstack([np.ones((len(x_trian), 1)), x_trian]) # 完全向量化运算,x_train 第一列添加为1
init_b_W = np.random.randn(X.shape[1])
self.b_W = sgd(X, y_train, init_b_W, epoch)
self.b = self.b_W[0]
self.W = self.b_W[1:]

def fit_gd(self, x_trian, y_train, lr=0.01, max_iters=1e4, epsilon=1e-8):
""" 批量梯度下降法求解 """

def J(b_W, X, y_train):
""" 求损失函数 """
try:
return (X.dot(b_W) - y_train).T.dot(X.dot(b_W) - y_train) / (2. * len(y_train))
except:
return float('inf')

def dJ(b_W, X, y_train):
""" 求梯度 """
return X.T.dot(X.dot(b_W) - y_train) / len(y_train)

def gradient_descent(X, y_train, init_b_W, lr, max_iters, epsilon):
""" 批量梯度下降 """
b_W = init_b_W
cur_iter = 0
while cur_iter < max_iters:
gradient = dJ(b_W, X, y_train)
last_b_W = b_W.copy()
b_W -= lr * gradient
if(abs(J(b_W, X, y_train) - J(last_b_W, X, y_train)) < epsilon):
break
cur_iter += 1

return b_W


X = np.hstack([np.ones((len(x_trian), 1)), x_trian]) # 完全向量化运算,x_train 第一列添加为1
init_b_W = np.zeros(X.shape[1])
self.b_W = gradient_descent(X, y_train, init_b_W, lr, max_iters, epsilon)
self.b = self.b_W[0]
self.W = self.b_W[1:]

def predict(self, x_test):
X = np.hstack([np.ones((len(x_test), 1)), x_test])
y_predict = X.dot(self.b_W)

return y_predict

def score(self, x_test, y_test):
X = np.hstack([np.ones((len(x_test), 1)), x_test])
y_predict = X.dot(self.b_W)

return self._r2_score(y_test, y_predict)

def _r2_score(self, y_test, y_predict):
r2_score = 1 - (np.sum((y_test - y_predict) ** 2)) / np.sum((y_test - np.mean(y_test)) ** 2)
return r2_score

test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
from LinearRegression import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import SGDRegressor

np.random.seed(1)

boston = datasets.load_boston()

X = boston.data
y = boston.target
X = X[y < 50.0]
y = y[y < 50.0]

X_train, X_test, y_train, y_test = train_test_split(X, y)

model_1 = LinearRegression()
model_1.fit(X_train, y_train)
print(model_1.score(X_test, y_test)) # 0.771

model_2 = LinearRegression()
model_2.fit_gd(X_train, y_train, lr=0.01, max_iters=1e4)
print(model_2.score(X_test, y_test)) # nan,学习率太大,不收敛

model_2 = LinearRegression()
model_2.fit_gd(X_train, y_train, lr=0.000001, max_iters=1e4)
print(model_2.score(X_test, y_test)) # 0.208,迭代次数太少

model_2 = LinearRegression()
model_2.fit_gd(X_train, y_train, lr=0.000001, max_iters=1e6)
print(model_2.score(X_test, y_test)) # 0.683

# 标准化数据
standardScaler = StandardScaler()
standardScaler.fit(X_train)
X_train = standardScaler.transform(X_train)
X_test = standardScaler.transform(X_test)

model_3 = LinearRegression()
model_3.fit_gd(X_train, y_train, lr=0.01, max_iters=1e6)
print(model_3.score(X_test, y_test)) # 0.771

model_3 = LinearRegression()
model_3.fit_sgd(X_train, y_train, epoch=10)
print(model_3.score(X_test, y_test)) # 0.772

# sklearn 自带 SGD
sgd = SGDRegressor(max_iter=10)
sgd.fit(X_train, y_train)
print(sgd.score(X_test, y_test)) # 0.772

参考文章