线性回归代码实战

引入库函数

1
2
3
4
### 引入库函数  
import torch
import matplotlib.pyplot as plt # 作图
import random # 产生随机数

在这一步中,我们引入了三个库:torch 是 PyTorch 深度学习框架,用于构建和训练模型;matplotlib.pyplot 用于绘制图形,方便我们可视化数据和模型结果;random 用于生成随机数,在后续的数据处理中会用到。

生成数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
### 生成数据集 输入真实 w 和 b以及数据量data_num
def generateDataSet(w, b, data_num):
x = torch.normal(0, 1, (data_num, len(w))) # std: 0, mean: 1, shape()
y = torch.matmul(x, w) + b # matmul函数实现矩阵相乘

# 添加噪声
noise = torch.normal(0, 0.01, y.shape)
y += noise

return x, y

# 定义真实 w 和 b
w_true = torch.tensor([8.1, 2, 2, 4])
b_true = torch.tensor(1.1)
DataSetNum = 500

# 生成训练集
X, Y = generateDataSet(w_true, b_true, DataSetNum)

# 可视化分享
plt.scatter(X[:, 3], Y, 1)
plt.show()

这里我们定义了一个函数 generateDataSet 用于生成数据集。首先,我们使用 torch.normal 函数生成均值为 0,标准差为 1 的正态分布随机数作为输入特征 x,其形状为 (data_num, len(w))。然后,通过矩阵乘法 torch.matmul 计算 y 的值,即 y = x * w + b。接着,为了模拟真实数据中的噪声,我们给 y 加上一个均值为 0,标准差为 0.01 的正态分布噪声。最后,返回生成的 x 和 y

分小批次采集数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 取数函数, Data: 自变量, labal: 标签, batchsize: 批量大小
def getData(data, label, batch_size):
length = len(label)
indices = list(range(length))

# 打乱数据, 取数据时要打乱数据
random.shuffle(indices)

for each in range(0, length, batch_size):
get_indices = indices[each: each + batch_size]
get_data = data[get_indices]
get_labal = label[get_indices]
# print(f"batch form {get_indices[0]} to {get_indices[-1]}")
# return 直接终止函数, yield 有存档点的返回数据
yield get_data, get_labal

# 测试获取数据是否分批
batch_size = 16
# for batch_x, batch_y in getData(X, Y, batch_size):
# print(batch_x, batch_y)

getData 函数用于分小批次采集数据。首先,我们获取标签的长度,生成一个包含所有索引的列表 indices,并使用 random.shuffle 函数打乱这些索引。然后,通过循环每次取出 batch_size 个索引,根据这些索引从数据和标签中取出对应的小批次数据,使用 yield 关键字返回,这样可以实现数据的分批迭代

定义模型

1
2
3
4
# 定义模型
def model(x, w, b):
y_pred = torch.matmul(x, w) + b
return y_pred

模型定义非常简单,就是一个线性模型 y_pred = x * w + b,其中 x 是输入特征,w 是权重,b 是偏置。

定义损失函数

1
2
3
# 定义损失函数
def maeloss(y_pred, y):
return sum(abs(y_pred - y)) / len(y)

这里我们使用平均绝对误差(MAE)作为损失函数,计算预测值 y_pred 与真实值 y 之间的绝对误差的平均值。

梯度下降(SGD)更新参数

1
2
3
4
5
6
7
8
9
# 梯度下降
def SGD(params, lr):
# 不计算梯度
with torch.no_grad():
for param in params:
# 无法写成 param = param - param.grad() * lr 报错
param -= param.grad * lr
# 将使用过的参数梯度归零
param.grad.zero_()

SGD 函数实现了随机梯度下降算法。在更新参数时,我们使用 with torch.no_grad() 上下文管理器来避免计算梯度,因为在更新参数时不需要计算梯度。对于每个参数,我们使用 param -= param.grad * lr 来更新参数值,其中 lr 是学习率。更新完参数后,使用 param.grad.zero_() 将参数的梯度归零,以便下一次计算梯度。

训练模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
lr = 0.03
# 随机生成 w, b参数的值
# 确定累计梯度
w = torch.normal(0, 0.01, w_true.shape, requires_grad=True)
b = torch.tensor(0.01, requires_grad=True)
# print(w, b)

# 训练轮数
epochs = 50

for epoch in range(epochs):
data_loss = 0
for x_batch, y_batch in getData(X, Y, batch_size):
y_pred = model(x_batch, w, b)
loss = maeloss(y_pred, y_batch)
# 梯度回传
loss.backward()
# 更新模型
SGD([w, b], lr)
data_loss += loss
print("epoch: %03d, loss: %.6f"%(epoch, data_loss))

print("真实的参数为:", w_true, b_true)
print("训练之后的参数:", w, b)

在训练模型部分,我们首先定义了学习率 lr,然后随机初始化权重 w 和偏置 b,并设置 requires_grad=True 以便计算梯度。接着,我们定义了训练轮数 epochs,在每个训练轮次中,我们遍历每个小批次的数据,计算预测值 y_pred 和损失 loss,使用 loss.backward() 进行梯度回传,然后调用 SGD 函数更新模型参数。最后,打印每个轮次的损失值以及真实参数和训练后的参数。

可视化

1
2
3
4
5
# 可视化
idx = 1
plt.plot(X[:, idx].detach().numpy(), X[:, idx].detach().numpy() * w[idx].detach().numpy() + b.detach().numpy())
plt.scatter(X[:, idx], Y, 1)
plt.show()

在可视化部分,我们选择第 idx 个特征,绘制出训练后的模型预测的直线和原始数据的散点图,方便我们直观地观察模型的拟合效果。

总结

  • 小批次采样时要打乱数据,保证是没有顺序依赖,打乱数据random.shuffle(list)
  • with torch.no_grad():作用域下在计算图中不会累计梯度
  • 计算完梯度使用param.grad.zero_()清空梯度否则后续计算梯度会累积梯度(梯度相加)
  • 要计算梯度的参数在定义时,使用requires_grad = True来存储梯度
  • 前向传播计算损失函数,后向传播更新参数
  • 学习率lr较小时训练的模型loss下降缓慢,较大时loss不稳定
  • loss.backward()梯度回传,此时param.grad为$\frac{\partial loss}{\partial param}$

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
### 引入库函数  
import torch
import matplotlib.pyplot as plt # 作图
import random # 产生随机数

### 生成数据集 输入真实 w 和 b以及数据量
def generateDataSet(w, b, data_num):
x = torch.normal(0, 1, (data_num, len(w)))
y = torch.matmul(x, w) + b # matmul函数实现矩阵相乘

# 添加噪声
noise = torch.normal(0, 0.01, y.shape)
y += noise

return x, y

w_true = torch.tensor([8.1, 2, 2, 4])
b_true = torch.tensor(1.1)
DataSetNum = 500

X, Y = generateDataSet(w_true, b_true, DataSetNum)

plt.scatter(X[:, 3], Y, 1)
plt.show()

# 取数函数, Data: 自变量, labal: 标签, batchsize: 批量大小
def getData(data, label, batch_size):
length = len(label)
indices = list(range(length))

# 打乱数据, 取数据时要打乱数据
random.shuffle(indices)

for each in range(0, length, batch_size):
get_indices = indices[each: each + batch_size]
get_data = data[get_indices]
get_labal = label[get_indices]
# print(f"batch form {get_indices[0]} to {get_indices[-1]}")
# return 直接终止函数, yield 有存档点的返回数据
yield get_data, get_labal

# 测试获取数据是否分批
batch_size = 16
# for batch_x, batch_y in getData(X, Y, batch_size):
# print(batch_x, batch_y)

# 定义模型
def model(x, w, b):
y_pred = torch.matmul(x, w) + b
return y_pred

# 定义损失函数
def maeloss(y_pred, y):
return sum(abs(y_pred - y)) / len(y)

# 梯度下降
def SGD(params, lr):
# 不计算梯度
with torch.no_grad():
for param in params:
# 无法写成 param = param - param.grad() * lr 报错
param -= param.grad * lr
# 将使用过的参数梯度归零
param.grad.zero_()

lr = 0.03
# 随机生成 w, b参数的值
# 确定累计梯度
w = torch.normal(0, 0.01, w_true.shape, requires_grad=True)
b = torch.tensor(0.01, requires_grad=True)
# print(w, b)

# 训练轮数
epochs = 50

for epoch in range(epochs):
data_loss = 0
for x_batch, y_batch in getData(X, Y, batch_size):
y_pred = model(x_batch, w, b)
loss = maeloss(y_pred, y_batch)
# 梯度回传
loss.backward()
# 更新模型
SGD([w, b], lr)
data_loss += loss
print("epoch: %03d, loss: %.6f"%(epoch, data_loss))

print("真实的参数为:", w_true, b_true)
print("训练之后的参数:", w, b)

# 可视化
idx = 1
plt.plot(X[:, idx].detach().numpy(), X[:, idx].detach().numpy() * w[idx].detach().numpy() + b.detach().numpy())
plt.scatter(X[:, idx], Y, 1)
plt.show()