docs/_tutorials/pipeline.md
https://gitee.com/qzl66/DeepSpeed/blob/master/docs/_tutorials/pipeline.md
1、重构管道模型 Expressing Pipeline Models
流水线并行要求模型被表示为一系列层。在前向传播中,每一层输入为上一层的输出。其实管道并行模型是不需要指定forward()的!管道并行模型的正向传递隐式采用以下形式:
def forward(self, inputs):
x = inputs
for layer in self.layers:
x = layer(x)
return x
PyTorch的torch.nn.Sequential是一个用于表达流水线并行模型的方便容器,可以通过DeepSpeed并行化,无需修改:
net = nn.Sequential(
nn.Linear(in_features, hidden_dim),
nn.ReLU(inplace=True),
nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)
PipelineModule使用其layers参数作为组成模型的层序列。在初始化之后,net被分成两个流水线阶段,其层被移动到相应的GPU。如果存在两个以上的GPU,DeepSpeed还将使用混合数据并行。
注意:GPU总数必须能被流水线阶段数整除。
Note: For large model training, see memory-efficient model construction. {: .notice--info}
让我们来看看torchvision的AlexNet的一个简化实现:
class AlexNet(nn.Module):
def __init__(self, num_classes=1000):
super(AlexNet, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
...
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
self.classifier = nn.Sequential(
nn.Dropout(),
...
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
AlexNet主要由几个连续的子模块组成。我们可以将它的子模块展平成一系列层,从而将它变成一个PipelineModule:
class AlexNetPipe(AlexNet):
def to_layers(self):
layers = [
*self.features,
self.avgpool,
lambda x: torch.flatten(x, 1),
*self.classifier
]
return layers
from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)
注意:上面各层中间的lambda不是torch.nn.Module类型。任何实现__call__()的对象都可以是PipelineModule中的一层:这允许在管道中进行方便的数据转换。
2、输入和输出:
按照torch.nn.Sequential,每层的输入和输出必须是单个torch。张量或一组张量。在实践中,一些模型可能需要修改它们的前向传递,将参数打包和解包为forward()。考虑transformer模块堆栈的简化实现:
class TransformerBlock(nn.Module)
...
def forward(self, hidden, mask):
output = self.compute(hidden, mask)
return output
...
stack = [ TransformerBlock() for _ in range(num_layers) ]
需要对TransformerBlock进行两处修改:
1、必须将参数收集到一个元组中。
2、mask也必须从forward()返回才能传递到下一层。
这些修改可以通过一个简短的子类来完成:
class TransformerBlockPipe(TransformerBlock)
def forward(self, inputs):
hidden, mask = inputs
output = super().forward(hidden, mask)
return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]
训练循环Training Loops
流水线并行将前向传播和后向传播交叉进行,因此训练循环不能被分成前向()、后向()和步进()的独立阶段。相反,DeepSpeed的管道引擎提供了一个train_batch()方法,该方法使管道引擎前进,直到下一批训练数据被使用并且模型权重被更新。
train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)
上面的train_batch()示例相当于下面的传统数据并行DeepSpeed:
train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
batch = next(data_iter)
loss = engine(batch)
engine.backward(loss)
engine.step()
数据处理