3.6 Softmax回归的从零开始实现

发布时间 2023-05-25 10:39:57作者: Ann-

我们首先导入相关的包,并读入训练和测试所用的数据集图片的DataLoader:

 这里面d2l.load_data_fashion_mnist(batch_size)读入训练和测试所用的图像数据集的DataLoader。

 1. 初始化模型参数

Softmax回归模型参数包括W、b。假设输入特征数量为num_inputs,输出的数量(类别的数量)为num_outputs,那么,W应该是num_inputs * num_outputs的。b则是num_outputs的。本数据集中的每个样本都是28×28的图像。本节将展平每个图像,把它们看作长度为784的向量。我们的数据集有10个类别,因此网络的输出维度为10。于是,权重将构成一个784 * 10的矩阵,偏置构成一个1 * 10的行向量。注意偏置是行向量。

 再次强调权重矩阵W的行数是输入特征数,列数是输出的类别数。

 

2. 定义softmax操作

我们首先定义一个函数softmax(),给出矩阵X,返回它的softmax矩阵softmax(X)。求softmax有三个步骤:

1. 对矩阵的每一项求e的幂;

2. 对每一行求和(小批量中每个样本是一行),得到每个样本的规范化常数;

3. 将每一行除以其规范化常数。

 上面的代码中,torch.exp(X)用于求eX,X_prob.sum(1)相当于X_prob.sum(dim=1)。

注意,softmax采用了幂指函数,矩阵X中有较大的数或较小的数时,容易导致计算溢出。

 

3. 定义模型

我们使用上面的softmax函数来实现softmax模型。注意,在将图像数据传递到模型之前,要将图片展开成一个784维的向量。我们需要的是 (批量大小 * 输入维数)的矩阵:

 上面的net()定义了 Y_hat = softmax( XW + b )。这里面有一个W.shape[0],它是一个int类型的值,就是W的行数。对于一个矩阵Q,Q.shape[0]就是它的行数,是一个int值,Q.shape[1]就是它的列数,同样是int值:

 

4. 定义损失函数

以下是一个方法,对于一个批量的样本,访问每一个预测的概率向量中,对真实分类的预测概率:

 这里面y是每个样本的真实标签,用int型数据表示,y = torch.tensor([0,2])表示批量中的第一个样本的真实类别为0,第二个样本的真实类别为2. y_hat是两个样本的预测概率向量。通过y_hat[[0,1],y],我们就得到了一个向量tensor([0.1000,0.5000]),两个概率分别是模型对两个样本的属于0和2的类别的概率的预测。

下面我们定义交叉熵损失函数:

 在上面的代码中,y_hat可以是多个样本组成的多个概率预测向量构成的矩阵,此时y是由多个整数值构成的向量,每个整数值代表了这个样本属于的类别。y_hat[range(len(y_hat)),y]即是模型预测出的各个样本属于它们真实的类别的概率,是一个长度为样本数量的向量;-torch.log()即是求它的交叉熵损失了,这是由交叉熵损失公式变形得来的,y_hat[range(len(y_hat)),y]这个向量的每一个概率越大,损失就越小。

 

5. 分类精度

分类精度是正确预测数量与总预测数量之比。下面的函数accuracy()传入y_hat以及y,返回正确预测的总数

这里面,我们认为y_hat是一个批量的预测矩阵,y是一个由整型值组成的向量。if的条件是:“y_hat的列数大于一并且它是维数多于一维的张量".因为如果不满足这个条件的话,无需下面的语句y_hat = y_hat.argmax(axis=1)也可以得到正确的结果。由于等式运算符“==”对数据类型很敏感, 因此我们将y_hat的数据类型转换为与y的数据类型一致。

一个tensor矩阵Q它的Q.shape是由它的行数和列数构成的torch.Size对象,len(Q.shape)就是它的torch.Size的长度:

 我们使用前面定义的y和y_hat来测试一下这个函数:

 

下面的函数用于计算模型在指定数据迭代器上的精度

注意,精度是在整个数据集上计算的,在这里,我们在这个函数里要用data_iter将整个测试集迭代一遍,对每个batch的正确分类的数量以及总的样本数进行累加,返回它们的比值作为正确率。

 其中,Accumulator的实现如下:

 我们使用测试集test_iter来评估我们定义的softmax回归模型的分类精度:

可以看到,精度接近0.1. 这是因为我们的参数是随机初始化的,模型相当于在十个类别里随机猜了一个。 

 

 6. 训练 

 我们分别定义训练一个epoch的函数train_epoch_ch3和完整的训练所有epoch的函数train_ch3:

#传入net、训练数据迭代器,loss函数,updater,训练一个迭代周期
def train_epoch_ch3(net, train_iter, loss, updater):  #@save
    """训练模型一个迭代周期(定义见第3章)"""
    # 如果是内置实例的话,将模型设置为训练模式
    if isinstance(net, torch.nn.Module):
        net.train()
    # 记录训练损失总和、训练准确度总和、样本数
    metric = Accumulator(3)
    for X, y in train_iter:
        # 计算梯度并更新参数
        y_hat = net(X)
        l = loss(y_hat, y)
        if isinstance(updater, torch.optim.Optimizer):
            # 使用PyTorch内置的优化器和损失函数
            updater.zero_grad()
            l.mean().backward()
            updater.step()
        else:
            # 使用定制的优化器和损失函数
            l.sum().backward()
            #注意updater的参数是batch_size
            updater(X.shape[0])
        metric.add(float(l.sum()), accuracy(y_hat, y), y.numel())
    # 返回训练损失和训练精度
    return metric[0] / metric[2], metric[1] / metric[2]
#在迭代器访问到的数据集上训练一个训练一个模型net,该训练函数将会运行多个迭代周期(由num_epochs指定)
def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater):  #@save
    """训练模型(定义见第3章)"""
    animator = Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0.3, 0.9],
                        legend=['train loss', 'train acc', 'test acc'])
    for epoch in range(num_epochs):
        train_metrics = train_epoch_ch3(net, train_iter, loss, updater)
        test_acc = evaluate_accuracy(net, test_iter)
        animator.add(epoch + 1, train_metrics + (test_acc,))
    train_loss, train_acc = train_metrics
    assert train_loss < 0.5, train_loss
    assert train_acc <= 1 and train_acc > 0.7, train_acc
    assert test_acc <= 1 and test_acc > 0.7, test_acc

定义updater:

#我们使用3.2节中定义的小批量随机梯度下降算法来优化模型的损失函数
lr = 0.1

def updater(batch_size):
    return d2l.sgd([W, b], lr, batch_size)

 

 

自己实现的简洁版代码:

import torch
from IPython import display
from d2l import torch as d2l

batch_size = 256
train_iter, test_iter = d2l.load_data_fashion_mnist(batch_size)
input_nums = 784
output_nums = 10

#定义softmax操作
def softmax(X):
    exp_X = torch.exp(X)
    partition = exp_X.sum(dim=1,keepdims=True)
    return exp_X / partition

# #测试softmax
# Q = torch.normal(1,1,size=(3,4))
# softmax_Q = softmax(Q)
# print(softmax_Q.sum(dim=1))

#定义模型
def net(X):
    return softmax(torch.matmul(X.reshape(-1,W.shape[0]),W) + b)

#参数初始化
W = torch.normal(0,0.01,size=(input_nums,output_nums),requires_grad=True)
b = torch.zeros(10,requires_grad=True)

#定义loss
def cross_entropy(y_hat,y):
    return -torch.log(y_hat[range(len(y_hat)),y])

#定义一个函数返回正确分类的个数
def accuracy(y_hat,y):
    if len(y_hat.shape)>1 and y_hat.shape[1]>1 :
        y_hat_max = y_hat.argmax(axis=1)
    #注意只有张量才有.type
    cmp = y_hat_max.type(y.dtype) == y
    #因为后面要除以总样本数,计算正确率,所以返回float型的
    return float(cmp.sum())

#定义在整个测试集上计算准确率的函数
def evaluate_accuracy(net,data_iter):
    if isinstance(net,torch.nn.Module):
        net.eval()
    accumulator = Accumulator(2)
    with torch.no_grad():
        for X,y in data_iter:
            y_hat = net(X)
            accumulator.add(accuracy(y_hat,y),y.numel())
        return accumulator[0] / accumulator[1]


#定义训练一个epoch的函数
def train_epoch_ch3(net,train_iter,updater):
    if isinstance(net,torch.nn.Module):
        net.train()
    metric = Accumulator(2)
    for X,y in train_iter:
        y_hat = net(X)
        l = cross_entropy(y_hat,y)
        if isinstance(updater,torch.optim.Optimizer):
            updater.zero_grad()
            l.mean().backward()
            updater.step()
        else:
            l.sum().backward()
            updater(X.shape[0])
        metric.add(l.sum(),y.numel())
    return metric[0] / metric[1]    #返回train_loss

#定义updater
#注意updater的功能应该包含:参数梯度清零、对一个batch_size的训练更新参数。
lr = 0.1
def updater(batch_size):
    d2l.sgd([W,b],lr,batch_size)

class Accumulator:  #@save
    """在n个变量上累加"""
    def __init__(self, n):
        self.data = [0.0] * n

    def add(self, *args):
        self.data = [a + float(b) for a, b in zip(self.data, args)]

    def reset(self):
        self.data = [0.0] * len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
    
def train_ch3(net,num_epochs,train_iter):
    for epoch in range(num_epochs):
        train_loss = train_epoch_ch3(net,train_iter,updater)
        train_accuracy = evaluate_accuracy(net,test_iter)
        print(f'epoch {epoch+1}, loss {train_loss:f}, accuracy {train_accuracy:f}\n')
    
num_epochs = 10
train_ch3(net,num_epochs,train_iter)

#一个预测函数
def predict_ch3(net, test_iter):  #@save
    """预测标签(定义见第3章)"""
    for X, y in test_iter:
        break
    trues = y
    preds_prob = net(X.reshape(-1,W.shape[0]))
    max_prob = preds_prob.argmax(axis=1)
    print(trues==max_prob)
    print(f'true: {trues},\n\n pred:{max_prob}\n')

predict_ch3(net, test_iter)

    

输出: