python实现逻辑回归

原理及相关参考资料

逻辑回归的实现

LogisticRegression.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import numpy as np

class LogisticRegression:
def __init__(self):
self.b = None # 偏置
self.W = None # 权重
self.b_W = None # b_W = [b, W]

def _sigmoid(self, x):
""" sigmoid 函数 """
return 1. / (1 + np.exp(-x))

def fit_sgd(self, x_trian, y_train, epoch=50, t0=5, t1=50):
"""
随机梯度下降
:param t0 t1: lr = t0 / (iter + t1) ,iter 为迭代次数
"""

def lr(t):
return t0 / (t + t1)

def dJ_sgd(b_W, X_i, y_train_i):
""" 求梯度 """
y_hat_i = self._sigmoid(X_i.T.dot(b_W))
return X_i * (y_hat_i - y_train_i)

def sgd(X, y_train, init_b_W, epoch):
""" 随机梯度下降 """
b_W = init_b_W
m = len(X)
for iter in range(epoch):
indexes = np.random.permutation(m) # 打乱顺序
X = X[indexes, :]
y_train = y_train[indexes]
for i in range(m):
gradient = dJ_sgd(b_W, X[i], y_train[i])
b_W -= lr(iter * m + i) * gradient

return b_W


X = np.hstack([np.ones((len(x_trian), 1)), x_trian]) # 完全向量化运算,x_train 第一列添加为1
init_b_W = np.random.randn(X.shape[1])
self.b_W = sgd(X, y_train, init_b_W, epoch)
self.b = self.b_W[0]
self.W = self.b_W[1:]

def fit_gd(self, x_trian, y_train, lr=0.01, max_iters=1e4, epsilon=1e-8):
""" 批量梯度下降法求解 """

def J(b_W, X, y_train):
""" 求损失函数 """
y_hat = self._sigmoid(X.dot(b_W))
try:
return -np.sum(y_train*np.log(y_hat) + (1-y_train)*np.log(1-y_hat)) / len(y_train)
except:
return float('inf')

def dJ(b_W, X, y_train):
""" 求梯度 """
return X.T.dot(self._sigmoid(X.dot(b_W)) - y_train) / len(y_train)

def gradient_descent(X, y_train, init_b_W, lr, max_iters, epsilon):
""" 批量梯度下降 """
b_W = init_b_W
cur_iter = 0
while cur_iter < max_iters:
gradient = dJ(b_W, X, y_train)
last_b_W = b_W.copy()
b_W -= lr * gradient
if(abs(J(b_W, X, y_train) - J(last_b_W, X, y_train)) < epsilon):
break
cur_iter += 1

return b_W


X = np.hstack([np.ones((len(x_trian), 1)), x_trian]) # 完全向量化运算,x_train 第一列添加为1
init_b_W = np.zeros(X.shape[1])
self.b_W = gradient_descent(X, y_train, init_b_W, lr, max_iters, epsilon)
self.b = self.b_W[0]
self.W = self.b_W[1:]

def fit(self, x_trian, y_train, lr=0.01, max_iters=1e4, epsilon=1e-8):
self.fit_gd(x_trian, y_train, lr=0.01, max_iters=1e4, epsilon=1e-8)

def predict(self, x_test):
X = np.hstack([np.ones((len(x_test), 1)), x_test])
y_proba = self._sigmoid(X.dot(self.b_W)) # 预测的概率
y_predict = np.array(y_proba >= 0.5, dtype='int') # 预测的类别

return y_predict

def score(self, x_test, y_test):
y_predict = self.predict(x_test)

def acc_score(y_test, y_predict):
""" 计算准确率 """
return np.sum(y_test == y_predict) / len(y_test)

return acc_score(y_test, y_predict)

调用逻辑回归

导包

1
2
3
4
5
6
7
8
9
10
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn import datasets
from LogisticRegression import LogisticRegression
from sklearn.model_selection import train_test_split

from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
iris = datasets.load_iris()

X = iris.data
y = iris.target

# 取出其中两个类别、两个特征进行分类
indexs = np.argwhere(y < 2).squeeze()
X = X[indexs, :2]
y = y[indexs]

plt.scatter(X[y==0,0], X[y==0,1], color="red", label='0')
plt.scatter(X[y==1,0], X[y==1,1], color="blue", label='1')
plt.legend()
plt.show()

1.png

训练及结果展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=666)
log_reg = LogisticRegression()
log_reg.fit_gd(X_train, y_train)
log_reg.score(X_test, y_test) # 1.0

# 画决策边界
x1 = np.arange(4, 8)
x2 = -(log_reg.W[0]*x1 + log_reg.b)/log_reg.W[1] # 决策边界:w1*x1+w2*x2+b=0
plt.plot(x1, x2)

plt.scatter(X[y==0,0], X[y==0,1], color="red", label='0')
plt.scatter(X[y==1,0], X[y==1,1], color="blue", label='1')
plt.legend(loc='upper left')

plt.show()

2.png

多项式逻辑回归拟合非线性数据

准备数据

1
2
3
4
5
6
7
8
np.random.seed(666)

X = np.random.normal(0, 1, size=(200, 2))
y = np.array((X[:,0]**2+X[:,1]**2)<1.5, dtype='int')

plt.scatter(X[y==0,0], X[y==0,1])
plt.scatter(X[y==1,0], X[y==1,1])
plt.show()

3.png

绘制决策边界函数

1
2
3
4
5
6
7
8
9
10
11
def plot_decision_boundary(model, axis):
x0, x1 = np.meshgrid(
np.linspace(axis[0], axis[1], int((axis[1]-axis[0])*100)).reshape(-1, 1),
np.linspace(axis[2], axis[3], int((axis[3]-axis[2])*100)).reshape(-1, 1),
)
X_new = np.c_[x0.ravel(), x1.ravel()]
y_predict = model.predict(X_new)
zz = y_predict.reshape(x0.shape)
from matplotlib.colors import ListedColormap
custom_cmap = ListedColormap(['#EF9A9A','#FFF59D','#90CAF9'])
plt.contourf(x0, x1, zz, linewidth=5, cmap=custom_cmap)

逻辑回归拟合

1
2
3
4
5
6
7
8
log_reg = LogisticRegression()
log_reg.fit_gd(X, y)
log_reg.score(X, y) # 0.605,欠拟合

plot_decision_boundary(log_reg, axis=[-4, 4, -4, 4])
plt.scatter(X[y==0,0], X[y==0,1])
plt.scatter(X[y==1,0], X[y==1,1])
plt.show()

4.png

多项式逻辑回归拟合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 为逻辑回归添加多项式项的管道
def PolynomialLogisticRegression(degree):
return Pipeline([
('poly', PolynomialFeatures(degree=degree)),
('std_scaler', StandardScaler()),
('log_reg', LogisticRegression())
])

# 使用管道得到对象
poly_log_reg = PolynomialLogisticRegression(degree=3)
poly_log_reg.fit(X, y)
poly_log_reg.score(X, y) # 0.955

plot_decision_boundary(poly_log_reg, axis=[-4, 4, -4, 4])
plt.scatter(X[y==0,0], X[y==0,1])
plt.scatter(X[y==1,0], X[y==1,1])
plt.show()

5.png