deepspeed流水线并行

发布时间 2023-08-28 14:02:26作者: qzl

docs/_tutorials/pipeline.md

https://gitee.com/qzl66/DeepSpeed/blob/master/docs/_tutorials/pipeline.md

 

1、重构管道模型  Expressing Pipeline Models

 流水线并行要求模型被表示为一系列层。在前向传播中,每一层输入为上一层的输出。其实管道并行模型是不需要指定forward()的!管道并行模型的正向传递隐式采用以下形式:
def forward(self, inputs):
    x = inputs
    for layer in self.layers:
        x = layer(x)
    return x
PyTorch的torch.nn.Sequential是一个用于表达流水线并行模型的方便容器,可以通过DeepSpeed并行化,无需修改:
net = nn.Sequential(
    nn.Linear(in_features, hidden_dim),
    nn.ReLU(inplace=True),
    nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)
PipelineModule使用其layers参数作为组成模型的层序列。在初始化之后,net被分成两个流水线阶段,其层被移动到相应的GPU。如果存在两个以上的GPU,DeepSpeed还将使用混合数据并行。
注意:GPU总数必须能被流水线阶段数整除。

Note: For large model training, see memory-efficient model construction. {: .notice--info}

 

让我们来看看torchvision的AlexNet的一个简化实现:

 

class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            ...
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            ...
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
AlexNet主要由几个连续的子模块组成。我们可以将它的子模块展平成一系列层,从而将它变成一个PipelineModule:
 
class AlexNetPipe(AlexNet):
    def to_layers(self):
        layers = [
            *self.features,
            self.avgpool,
            lambda x: torch.flatten(x, 1),
            *self.classifier
        ]
        return layers

from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)
注意:上面各层中间的lambda不是torch.nn.Module类型。任何实现__call__()的对象都可以是PipelineModule中的一层:这允许在管道中进行方便的数据转换。
 

2、输入和输出:

按照torch.nn.Sequential,每层的输入和输出必须是单个torch。张量或一组张量。在实践中,一些模型可能需要修改它们的前向传递,将参数打包和解包为forward()。考虑transformer模块堆栈的简化实现:
class TransformerBlock(nn.Module)
    ...
    def forward(self, hidden, mask):
        output = self.compute(hidden, mask)
        return output
    ...

stack = [ TransformerBlock() for _ in range(num_layers) ]
需要对TransformerBlock进行两处修改:
1、必须将参数收集到一个元组中。
2、mask也必须从forward()返回才能传递到下一层。
这些修改可以通过一个简短的子类来完成:
class TransformerBlockPipe(TransformerBlock)
    def forward(self, inputs):
        hidden, mask = inputs
        output = super().forward(hidden, mask)
        return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]
 

训练循环Training Loops

 流水线并行将前向传播和后向传播交叉进行,因此训练循环不能被分成前向()、后向()和步进()的独立阶段。相反,DeepSpeed的管道引擎提供了一个train_batch()方法,该方法使管道引擎前进,直到下一批训练数据被使用并且模型权重被更新。
train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)
上面的train_batch()示例相当于下面的传统数据并行DeepSpeed:
train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
    batch = next(data_iter)
    loss = engine(batch)
    engine.backward(loss)
    engine.step()
 

数据处理