像建房子一样打造变形金刚,追梦女孩要刚强(二)

发布时间 2023-08-01 20:10:33作者: 鸽鸽的书房

今天的任务很艰巨,需要把下面这张图的模型架构复现一遍,要有耐心哦。我参考了哈佛NLP小组对transformer的分拆讲解The Annotated Transformer,但思路不同于原文。原文是从整体到局部,而我是从局部到整体。
我们先把Day1的嵌入层复制过来(使用的是harvard的版本):

from torch import Tensor
import torch
import math
import torch.nn as nn
class PositionalEncoding(nn.Module):
    "Implement the PE function."
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) *
                             -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + Variable(self.pe[:, :x.size(1)], 
                         requires_grad=False)
        return self.dropout(x)
class Embeddings(nn.Module):
    def __init__(self, d_model, vocab):
        super(Embeddings, self).__init__()
        self.lut = nn.Embedding(vocab, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.lut(x) * math.sqrt(self.d_model)

编码器

我们先来观察左边的编码器,用于将输入序列\((x1,...,xn)\)映射到其语义表示\(z=(z1,...,zn)\).

编码器由6个相同的层堆叠起来,我们先实现其中一层 (layer)。我们发现每一层有两个子层 (sub-layer),第一个子层是多头自注意力机制,第二个子层是简单的全连接前馈网络。

还可以发现,这些层一个有意思的特点是输入和输出向量的维度不变。

子层连接

我们发现两个子层 (sub-layer) 都要应用残差连接 (residual connection) 和层归一化 (layer normalization):

(1) 残差连接:将输入 x 添加到子层的输出中,可以应用于具有相同大小的任何子层;

(2) 层归一化:将输入标准化为零均值和单位方差。

具体实施步骤是:子层的输出先通过 LayerNorm 层,然后通过 Dropout 层,最后再加到 x 中。

\(x+SubLayer(LayerNorm(x))\)

class SublayerConnection(nn.Module):
    """
    A residual connection followed by a layer norm.
    Note for code simplicity the norm is first as opposed to last.
    """

    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        "Apply residual connection to any sublayer with the same size."
        return x + self.dropout(sublayer(self.norm(x)))

这里手动实现一个层归一化,它是Hinton的组提出来的,可以将每个样本的每个特征维度进行标准化,使其均值为0,方差为1。与批量归一化(Batch Normalization)不同,Layer normalization是在每个样本上进行归一化,而不是在每个批次上进行归一化。在Layer normalization中,我们计算这些均值和方差对该维度上的所有样本进行标准化。标准化后的数据再通过一个可学习的缩放因子和偏置项进行缩放和平移,得到最终的输出。Layer normalization可以帮助解决神经网络中的梯度消失和梯度爆炸等问题,并且可以提高模型的泛化能力和训练速度。

image.png

class LayerNorm(nn.Module):
    # `eps` 表示一个非常小的常数,用于防止除数为0的情况
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        # 定义两个可学习的参数,分别是缩放因子和偏置项
        # 这两个参数的形状都是 `(features,)`,即每个特征维度都有一个缩放因子和一个偏置项
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True) # 对输入张量 `x` 沿着最后一个维度求平均值,并保持其维度不变
        std = x.std(-1, keepdim=True) # 对输入张量 `x` 沿着最后一个维度求标准差,并保持其维度不变
        # 对输入张量进行标准化,即将每个特征维度减去均值并除以标准差。
        # 最后,使用可学习的缩放因子和偏置项进行缩放和平移,得到最终的输出
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2 

编码器单层

编码器层的输入为x 和掩码张量mask ,掩码用于遮盖输入序列中的无效位置(可变长度序列中多余的位置),以便在模型训练或推理时忽略掉。

我们用clones拷贝两个 SublayerConnection 类的实例并储存为列表,第一个SublayerConnection 实例将残差连接和层归一化应用于自注意力子层的输出self_attn,第二个实例将残差连接和层归一化应用于前馈神经网络子层的输出feed_forward.

class EncoderLayer(nn.Module):

    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 2)
        self.size = size

    def forward(self, x, mask):
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
        return self.sublayer[1](x, self.feed_forward)
import copy
def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

编码器

我们把N个单层堆叠起来就是完整的编码器啦!最后再归一化。

class Encoder(nn.Module):
    "Core encoder is a stack of N layers"

    def __init__(self, layer, N):
        super(Encoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)

    def forward(self, x, mask):
        "Pass the input (and mask) through each layer in turn."
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

解码器

解码器的输入是编码器输出的语义表示\(z=(z1,...,zn)\)和向右位移的输出嵌入\(y=(y1,...,yn)\)。向右位移就是输出嵌入会向后偏移一个位置,确保位置i的预测只能取决于小于i的位置的已知输出。

解码器单层

再看解码器的每一层,比编码器要复杂:除了编码器每层的两个子层外,还插入一个第三子层,对编码器的输出 (m) 实施多头注意力机制 (src_attn).

class DecoderLayer(nn.Module):
    "Decoder is made of self-attn, src-attn, and feed forward (defined below)"
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()
        self.size = size
        self.self_attn = self_attn
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 3)
 
    def forward(self, x, memory, src_mask, tgt_mask):
        "Follow Figure 1 (right) for connections."
        m = memory
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask)) # m:memory
        return self.sublayer[2](x, self.feed_forward)

另一个变动是修改解码器的自注意力子层的掩码,来掩盖当前位置之后的位置,以防止当前位置关注后续位置。

import numpy as np
def subsequent_mask(size):
    "Mask out subsequent positions."
    attn_shape = (1, size, size)
    # 生成一个上三角矩阵,其中对角线以下的元素为1,对角线及以上的元素为0,形状与`attn_shape`相同。
    # `k=1`表示从对角线向上偏移1个单位,即掩盖当前位置之后的位置
    # 最后将这个矩阵转换为`uint8`类型(`uint8`是一种无符号整数类型,它占用8位(即1个字节)的存储空间,可表示的范围是0到255)
    subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8') 
    # 将 NumPy数组转换为 PyTorch 张量,并将张量中的零元素设置为`True`,非零元素设置为`False`
    return torch.from_numpy(subsequent_mask) == 0
# 打印出来看看
size = 3
subsequent_mask(size)
tensor([[[ True, False, False],
         [ True,  True, False],
         [ True,  True,  True]]])

解码器

整体上,和编码器一样,也是N个层的堆叠。

class Decoder(nn.Module):
    "Generic N layer decoder with masking."
    def __init__(self, layer, N):
        super(Decoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
        
    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)

编码器-解码器及完整的模型

标准的编码器-解码器架构包括编码器、解码器、源语言嵌入、目标语言嵌入和生成器。

class EncoderDecoder(nn.Module):
    """
    A standard Encoder-Decoder architecture. Base for this and many 
    other models.
    """
    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        super(EncoderDecoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.generator = generator
        
    def forward(self, src, tgt, src_mask, tgt_mask):
        "Take in and process masked src and target sequences."
        return self.decode(self.encode(src, src_mask), src_mask,
                            tgt, tgt_mask)
    
    def encode(self, src, src_mask):
        return self.encoder(self.src_embed(src), src_mask)
    
    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

生成器类 Generator,包含一个线性层和一个softmax 层,用于将解码器的输出转换为概率分布。

class Generator(nn.Module):
    "Define standard linear + softmax generation step."
    def __init__(self, d_model, vocab):
        super(Generator, self).__init__()
        self.proj = nn.Linear(d_model, vocab) 

    def forward(self, x):
        return F.log_softmax(self.proj(x), dim=-1) # `dim=-1` 表示对最后一个维度进行 softmax 操作

我们把编码器-解码器和最后的生成器连起来就是完整的模型啦!

nn.init.xavier_uniform(p)对张量p进行Xavier均匀初始化。根据张量的输入和输出维度计算一个标准差,然后从均匀分布中随机采样初始化张量,使得p的每个元素都满足均值为0,方差为大概是2/(输入维度+输出维度)的分布(不准确)。Glorot和fan_avg也是两种常用的权重初始化方法,他们的关系我还不清楚。

def make_model(src_vocab, tgt_vocab, N=6, 
               d_model=512, d_ff=2048, h=8, dropout=0.1): # 前馈神经网络的隐藏层维度 `d_ff` 多头注意力机制中头的数量 `h`
    c = copy.deepcopy
    attn = MultiHeadedAttention(h, d_model)
    ff = PositionwiseFeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    model = EncoderDecoder(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
        Decoder(DecoderLayer(d_model, c(attn), c(attn), 
                             c(ff), dropout), N),
        nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
        nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
        Generator(d_model, tgt_vocab))
    
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p) # 对模型的参数进行初始化
    return model

至此我们就把这个模型搭建完啦!!!不过这里的两个细节MultiHeadedAttention和PositionwiseFeedForward还没有讲!

其他细节

基于位置的前馈网络

我们先来简单的这个:PositionwiseFeedForward.

这个全连接前馈网络是双层的,对每个位置分别且同等进行。它包括两次线性变换,中间用一个Relu激活。输入和输出的维度为dmodel=512,内层的维度为dff=2048。

\[\operatorname{FFN}(x)=\max \left(0, x W_1+b_1\right) W_2+b_2 \]

class PositionwiseFeedForward(nn.Module):
    "Implements FFN equation."
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x))))

多头自注意力

重磅来啦!小小的attention是整个模型的精华!我们使用Scaled Dot Product Attention机制计算query和key之间的注意力分布,然后将该分布应用于value上,得到加权和。

“缩放点积注意力”在计算查询与所有键的点积之后,要除以√dk(通过使用键维度的平方根进行缩放,有助于防止点积变得过大,在训练过程中可能导致数值不稳定),并应用softmax函数以获得值的权重。

\[\operatorname{Attention}(Q, K, V)=\operatorname{softmax}\left(\frac{Q K^T}{\sqrt{d_k}}\right) V \]

其中 d_k是查询向量和键向量的维度,也是注意力分布的维度。 scores是注意力分数矩阵,表示每个位置对其他位置的注意力分布。p_attn是用softmax函数归一化的注意力概率分布矩阵,形状同scores。

import torch.nn.functional as F
def attention(query, key, value, mask=None, dropout=None): # query、key、value的形状都是(batch_size, num_heads, seq_len, d_k)
    "Compute 'Scaled Dot Product Attention'"
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) \
             / math.sqrt(d_k)
    if mask is not None:  # mask掩码矩阵形状为(batch_size, 1, seq_len, seq_len)或(batch_size, num_heads, seq_len, seq_len)
        scores = scores.masked_fill(mask == 0, -1e9) # scores形状为(batch_size, num_heads, seq_len, seq_len)
    p_attn = F.softmax(scores, dim = -1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn # matmul计算加权和
query = torch.rand(2, 2, 20, 2)
key = torch.rand(2, 2, 20, 2)
value = torch.rand(2, 2, 20, 2)
atte = attention(query, key, value, mask=None, dropout=None)
atte[0].shape, atte[1].shape, 
(torch.Size([2, 2, 20, 2]), torch.Size([2, 2, 20, 20]))

多头注意力机制允许模型同时关注不同表示子空间中不同位置的信息。使用单个注意力头时,平均会抑制这种关注。\(\operatorname{MultiHead}(Q, K, V)=\operatorname{Concat}\left(\operatorname{head}_1, \ldots, \operatorname{head}_{\mathrm{h}}\right) W^O\),其中 \(^{\operatorname{head}}{ }_{\mathrm{i}}=\operatorname{Attention}\left(Q W_i^Q, K W_i^K, V W_i^V\right)\)。投影是参数矩阵 \(W_i^Q \in \mathbb{R}^{d_{\text {model } \times d k}}, W_i^K \in \mathbb{R}^{d_{\text {model } \times d k}}, W_i^V \in \mathbb{R}^{d \text { model } \times d v}\)\(W^O \in \mathbb{R}^{h d_v \times d_{\text {model }}}\)。在本文中,我们使用 \(h=8\) 个并行的注意力层或头。对于每个头,我们使用 \(d_k=d_v=d_{\text {model }} / h=64\)。由于每个头的降维,总计算成本与具有完整维度的单头注意力相似。

这里多头注意力的前向传播需要费些脑筋!因为我们要把多头拼接起来并行运算。

class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h  # d_k是每个头的维度
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4) # 由4个nn.Linear组成的列表,每个nn.Linear的输入维度和输出维度都为d_model
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            # Same mask applied to all h heads.
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)
        
        # 1) Do all the linear projections in batch from d_model => h x d_k 
        query, key, value = \
            [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
             for l, x in zip(self.linears, (query, key, value))] # 将query、key、value分别通过四个nn.Linear进行线性变换,得到新的向量
        
        # 2) Apply attention on all the projected vectors in batch. 
        x, self.attn = attention(query, key, value, mask=mask, 
                                 dropout=self.dropout)
        
        # 3) "Concat" using a view and apply a final linear. 
        x = x.transpose(1, 2).contiguous() \
             .view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x)

完工!我们来打印一个模型实例看看!

# Small example model
tmp_model = make_model(2000, 2000, 6)
tmp_model
EncoderDecoder(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-5): 6 x EncoderLayer(
        (self_attn): MultiHeadedAttention(
          (linears): ModuleList(
            (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
          )
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward): PositionwiseFeedForward(
          (w_1): Linear(in_features=512, out_features=2048, bias=True)
          (w_2): Linear(in_features=2048, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (sublayer): ModuleList(
          (0-1): 2 x SublayerConnection(
            (norm): LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
        )
      )
    )
    (norm): LayerNorm()
  )
  (decoder): Decoder(
    (layers): ModuleList(
      (0-5): 6 x DecoderLayer(
        (self_attn): MultiHeadedAttention(
          (linears): ModuleList(
            (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
          )
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (src_attn): MultiHeadedAttention(
          (linears): ModuleList(
            (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
          )
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (feed_forward): PositionwiseFeedForward(
          (w_1): Linear(in_features=512, out_features=2048, bias=True)
          (w_2): Linear(in_features=2048, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (sublayer): ModuleList(
          (0-2): 3 x SublayerConnection(
            (norm): LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
        )
      )
    )
    (norm): LayerNorm()
  )
  (src_embed): Sequential(
    (0): Embeddings(
      (lut): Embedding(2000, 512)
    )
    (1): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (tgt_embed): Sequential(
    (0): Embeddings(
      (lut): Embedding(2000, 512)
    )
    (1): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (generator): Generator(
    (proj): Linear(in_features=512, out_features=2000, bias=True)
  )
)

总结

第二天我们如期完成了模型搭建任务,虽然看起来很复杂,但只要耐心+细心,还是能一步步跟下来的!罗马非一日建成,我们一起沉下心努力钻研!

代码参考

https://nlp.seas.harvard.edu/2018/04/03/attention.html