MegEngine Python 层模块串讲(上)

发布时间 2023-07-21 18:03:52作者: MegEngine

前面的文章中,我们简单介绍了在 MegEngine imperative 中的各模块以及它们的作用。对于新用户而言可能不太了解各个模块的使用方法,对于模块的结构和原理也是一头雾水。Python 作为现在深度学习领域的主流编程语言,其相关的模块自然也是深度学习框架的重中之重。

模块串讲将对 MegEngine 的 Python 层相关模块分别进行更加深入的介绍,会涉及到一些原理的解释和代码解读。Python 层模块串讲共分为上、中、下三个部分,本文介绍 Python 层的 data 模块。读者将通过本文了解到要构建数据 pipeline 所需要的对象,以及如何高效地构建 pipeline

构建数据处理 pipeline —— data 模块

神经网络需要数据才可以训练,数据源文件可能是各种格式,读取数据需要定义采样规则、数据变换规则、数据合并策略等,这些和数据相关的模块都封装在 data 下。

在 MegEngine 中训练模型读取数据的 pipeline 一般是:

  1. 创建一个 Dataset 对象;
  2. 按照训练场景的需求可能需要对数据做一些变换或合并的处理,这里可能需要创建 SamplerTransformCollator 等对象来完成相应操作;
  3. 创建一个 DataLoader 对象;
  4. 将数据分批加载到 DataLoader 里,迭代 DataLoader 对象进行训练。

下面我们看一下这几个对象的实现。

Dataset

在 MegEngine 中,数据集是一个可迭代的对象,所有的 Dataset 对象都继承自 class Dataset,都需要实现自己的 __getitem__() 方法和 __len__() 方法,这两个方法分别是用来获取给定索引的对应的数据样本和返回数据集的大小。

根据对数据集访问方式的区别,MegEngine 中的数据集类型主要分为两种:ArrayDataset 和 StreamDataset,前者支持随机访问数据样本,而后者只可以顺序访问。二者的主要区别见下表:

截屏2023-07-21 16.49.16.png

Dataset

Dataset 支持对数据集的随机访问,访问类型是 Map-style 的,也就是可以从索引映射到数据样本,使用时需要实现 __getitem__() 方法和 __len__() 方法。

下面是一个使用 Dataset 生成一个由 0 到 5 的数组成的数据集的例子:

from megengine.data.dataset import Dataset

class CustomMapDataset(Dataset):
    def __init__(self, data):
        self.data = data
​
    def __getitem__(self, idx):
        return self.data[idx]
​
    def __len__(self):
        return len(self.data)

使用起来如下:

>>> data = list(range(0, 5))
>>> map_dataset = CustomMapDataset(data)
>>> print(len(map_dataset))
5
>>> print(map_dataset[2])
2

可以发现 Dataset 最大的特点就是可以根据给定索引随机访问数据集中对应下标的数据。

ArrayDataset

对于 Numpy ndarray 类型的数据,MegEngine 中对 Dataset 进一步封装实现了 ArrayDataset,使用 ArrayDataset 无需实现 __getitem__() 方法和 __len__() 方法。

下面的例子随机生成了一个具有 100 个样本、每张样本为 32 × 32 像素的 RGB 图片的数据集:

import numpy as np
from megengine.data.dataset import ArrayDataset
​
data = np.random.random((100, 3, 32, 32))
target = np.random.random((100, 1))
dataset = ArrayDataset(data, target)
>>> print(len(dataset))
100
>>> print(type(dataset[0]), len(dataset[0]))
<class 'tuple'> 2
>>> print(dataset[0][0].shape)
(3, 32, 32)

由于需要支持随机访问,因此对于支持顺序访问的 Dataset 需要将索引等信息加载进内存,如果数据集规模较大导致内存无法存放从而发生 OOM(Out Of Memory),我们需要考虑使用流式数据 StreamDataset

StreamDataset

当数据集规模较大时,使用流失数据迭代访问数据对象是比较主流的做法。从类的定义可以看到:由于无法根据索引获取数据,因此 StreamDataset 无需实现 __getitem__() 方法和 __len__() 方法,但是需要实现一个 __iter__() 方法定义流式获取数据的规则:

class StreamDataset(Dataset):
    r"""An abstract class for stream data.
    __iter__ method is aditionally needed.
    """
​
    @abstractmethod
    def __init__(self):
        pass
​
    @abstractmethod
    def __iter__(self):
        pass
​
    def __getitem__(self, idx):
        raise AssertionError("can not get item from StreamDataset by index")
​
    def __len__(self):
        raise AssertionError("StreamDataset does not have length")

StreamDataset 适用的场景主要是:

  • 随机读取成本过高,或者数据规模太大,无法支持;
  • 必须根据流数据才能判断当前批是否已经完整。

可以使用流数据返回从数据库、远程服务器甚至实时生成的日志中读取的数据流。

下面的例子展示了如何生成一个由 0 到 5 这五个数组成的数据集:

from megengine.data.dataset import StreamDataset
​
class CustomIterableDataset(StreamDataset):
    def __init__(self, data):
        self.data = data
​
    def __iter__(self):
        return iter(self.data)
>>> data = list(range(0, 5))
>>> iter_dataset = CustomIterableDataset(data)
>>> it = iter(iter_dataset)
>>> print(type(it))
list_iterator
>>> print(next(it))
0
>>> print(next(it))
1

Sampler

有了 DataSet 之后,DataLoader 可以从数据集加载数据到内存,但是对每批数据有时候需要规定规模的大小,还有定义抽样规则等需求,使用 Sampler 可以对每批数据的抽样规则进行自定义。

准确来说,抽样器的职责是决定数据的获取顺序,方便为 DataLoader 提供一个可供迭代的多批数据的索引:

dataloader = DataLoader(dataset, sampler=RandomSampler)

在 MegEngine 中,Sampler 是所有抽样器的抽象基类,在大部分情况下用户无需对抽样器进行自定义实现, 因为在 MegEngine 中已经实现了常见的各种抽样器,比如上面示例代码中的 RandomSampler 抽样器。

下面介绍 MegEngine 中几种常见的 Sampler

SequentialSampler

SequentialSampler 也叫 MapSampler, 顾名思义就是对数据集进行顺序抽样的抽样器。

对一个含有 100 个数据样本的数据集,batch_size 为 10,可以得到 10 批顺序索引:

>>> from megengine.data import SequentialSampler
>>> sampler = SequentialSampler(image_dataset, batch_size=10)
>>> print(len(list(sampler)))
10
如果将 batch_size 修改为 30, 则会得到 4 批顺序索引,最后一批长度为 10:

>>> sampler = SequentialSampler(image_dataset, batch_size=30)
>>> for batch_id, indices in enumerate(sampler):
...     print(batch_id, len(indices))
0 30
1 30
2 30
3 10
我们可以通过设置 drop_last=True 丢掉最后一批不完整的索引:

>>> sampler = SequentialSampler(image_dataset, 30, drop_last=True)
>>> for batch_id, indices in enumerate(sampler):
....    print(batch_id, len(indices))
0 30
1 30

默认情况下 batch_size 为 1,表示逐个遍历数据集中的样本,drop_last 为 False

RandomSampler

RandomSampler 用来对数据集进行无放回随机抽样(也叫简单随机抽样)。

直接看例子:

>>> from megengine.data import RandomSampler
>>> sampler = RandomSampler(image_dataset, batch_size=10)
>>> for batch_id, indices in enumerate(sampler):
...     print(batch_id, indices)
0 [78, 20, 74, 6, 45, 65, 99, 67, 88, 57]
1 [81, 0, 94, 98, 71, 30, 66, 10, 85, 56]
2 [51, 87, 62, 42, 7, 75, 11, 12, 39, 95]
3 [73, 15, 77, 72, 89, 13, 55, 26, 49, 33]
4 [9, 8, 64, 3, 37, 2, 70, 29, 34, 47]
5 [22, 18, 93, 4, 40, 92, 79, 36, 84, 25]
6 [83, 90, 68, 58, 50, 48, 32, 54, 35, 1]
7 [14, 44, 17, 63, 60, 97, 96, 23, 52, 38]
8 [80, 59, 53, 19, 46, 43, 24, 61, 16, 5]
9 [86, 82, 31, 76, 28, 91, 27, 21, 69, 41]

ReplacementSampler

ReplacementSampler 是有放回随机抽样,也就是可能抽样到之前已经抽样过的数据。

使用方法和无放回随机抽样类似:

>>> from megengine.data import ReplacementSampler
>>> sampler = ReplacementSampler(image_dataset, batch_size=10)
>>> for batch_id, indices in enumerate(sampler):
...     print(batch_id, indices)
0 [58, 29, 42, 79, 91, 73, 86, 46, 85, 23]
1 [42, 33, 61, 8, 22, 10, 98, 56, 59, 96]
2 [38, 72, 26, 0, 40, 33, 30, 59, 1, 25]
3 [71, 95, 89, 88, 29, 97, 97, 46, 42, 0]
4 [42, 22, 28, 82, 49, 52, 88, 68, 46, 66]
5 [47, 62, 26, 17, 68, 31, 70, 69, 26, 4]
6 [43, 18, 17, 91, 99, 96, 91, 7, 24, 39]
7 [50, 55, 86, 65, 93, 38, 39, 4, 6, 60]
8 [92, 82, 61, 36, 67, 56, 24, 18, 70, 60]
9 [91, 63, 95, 99, 19, 47, 9, 9, 68, 37]

Infinite

通常数据集在给定 batch_size 的情况下,只能划分为有限个 batch。 这意味着抽样所能得到的数据批数是有限的,想要重复利用数据, 最常见的做法是循环多个周期 epochs 来反复遍历数据集:

for epoch in epochs:
    for batch_data in dataloader:

但在一些情况下,我们希望能够直接从数据集中无限进行抽样, 因此MegEngine提供了 Infinite 包装类用来进行无限抽样:

>>> from megengine.data import Infinite
>>> sampler = Infinite(SequentialSampler(image_dataset, batch_size=10))
>>> sample_queue = iter(sampler)
>>> for step in range(20):
...     indice = next(sample_queue)
...     print(step, indice)
0 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
2 [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
3 [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
4 [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
5 [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
6 [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
7 [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
8 [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
9 [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
10 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
11 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
12 [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
13 [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
14 [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
15 [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
16 [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
17 [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
18 [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
19 [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

以上就是常见的 Sampler 的使用方法,有时候对于数据集中的数据还需要做一些变换以满足业务需要,这就是我们接下来要说的 transform

Transform

在深度学习中对数据进行变换(Transformation)以满足业务需求和增强模型性能是很常见的操作。

在 megengine.data.transform 中提供的各种数据变换都是基于 Transform 抽象类实现的,其中:

  • apply 抽象方法可用于单个的数据样本, 需要在子类中实现;
  • 各种变换操作可以通过 Compose 进行组合,这样使用起来更加方便。

我们能够很方便地在 DataLoader 加载数据时进行相应地变换操作。例如:

dataloader = DataLoader(dataset, transform=Compose([Resize(32), ToMode('CHW')]))

上面就是将两个 transform 操作 Resize() 和 ToMode() 组合起来对数据进行变换。

下面举个例子如何实现自己的 Transform

>>> from megengine.data.transform import Transform
>>> class AddOneTransform(Transform):
...     def apply(self, input):
...         return input + 1
>>> AddOneTransform().apply(data)
array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])

上面这个 Transform 实现了自己的 apply() 方法,对数据集中的所有样本做了一个 +1 操作。

可以使用 Compose 对数据变换进行组合:

>>> from megengine.data.transform import Compose
>>> composed_transform = Compose([AddOneTransform(), AddOneTransform()])
>>> composed_transform.apply(data)
array([[ 2,  3,  4],
       [ 5,  6,  7],
       [ 8,  9, 10]])

最终,我们的各种Transform实现应当被应用于DataLoader

dataloader = DataLoader(dataset, transform=composed_transform)

实际使用时对数据做的操作往往比上面的例子要复杂许多,MegEngine 在  VisionTransform  中已经实现了很多转换方法供用户使用。用户也可以根据需要实现自己的数据变换方法。

当我们从 DataLoader 中获取批数据时,如果定义了 Transform, 则会在每次加载完样本后立即对其进行变换。

数据变换操作也是有计算开销的,且该流程通常在 CPU 设备上进行,以及有些操作会调用类似 OpenCV 的库。 如果我们对每个样本进行多次加载(比如训练多个周期),那么变换操作也会被执行多次,这可能会带来额外的开销。 因此在有些时候,我们会选择将预处理操作在更早的流程中进行,即直接对原始数据先进行一次预处理操作, 这样在 DataLoader 中获取的输入便已经是经过预处理的数据了,这样可以尽可能地减少 Transform 操作。

用户应当考虑到,原始数据相关的 I/O 和处理也有可能成为模型训练整体流程中的瓶颈。

Collator

在使用 DataLoader 获取批数据的整个流程中, Collator 负责合并样本,最终得到批数据。

Collator 仅适用于 Map-style 的数据集,因为 Iterable-style 数据集的批数据必然是逐个合并的。

经过 DataSet 和 Transform 的处理后, Collator 通常会接收到一个列表:

  • 如果你的 Dataset 子类的 __getitem__ 方法返回的是单个元素,则 Collator 得到一个普通列表;
  • 如果你的 Dataset 子类的 __getitem__ 方法返回的是一个元组,则 Collator 得到一个元组列表。

MegEngine 中使用  Collator  作为默认实现,通过调用 apply 方法来将列表数据合并成批数据:

from megengine.data import Collator
collator = Collator()

默认的 Collator 支持 NumPy ndarrayNumbersUnicode stringsbytesdicts 或 lists 数据类型。 要求输入必须包含至少一种上述数据类型,否则用户需要使用自己定义的 Collator

Collator 的作用是合并数据,比如每个数据样本是 shape 为 (C, H, W) 的图片,如果我们在 Sampler 中指定了 batch_size 为 N。那么 Collator 就会将获得的样本列表合并成一个 shape 为 (N, C, H, W) 的批样本结构。

我们可以模拟得到这样一个 image_list 数据,并借助 Collator 得到 batch_image

>>> N, C, H, W = 5, 3, 32, 32
>>> image_list = []
>>> for i in range(N):
...     image_list.append(np.random.random((C, H, W)))
>>> print(len(image_list), image_list[0].shape)
5 (3, 32, 32)
>>> batch_image = collator.apply(image_list)
>>> batch_image.shape
(5, 3, 32, 32)

DataLoader

前面介绍的 DatasetSamplerTransformCollator 等对象都是为了更灵活地配置 DataLoader 对象的。

当单进程运行 DataLoader 时(设置 num_workers=0),每当我们向 DataLoader 索要一批数据时,DataLoader 将从 Sampler 获得下一批数据的索引, 根据 Dataset 提供的 __getitem__() 方法将对应的数据逐个加载到内存, 加载进来的数据可以通过指定的 Transform 做一些处理,再通过 Collator 将单独的数据组织成批数据。

DataLoader 也支持多进程加载以提升数据加载处理速度(提高 num_workers 数量)。 一般 worker 数量越多,数据加载处理的速度会越快。不过如果 worker 数过多, 并大大超出了系统中 cpu 的数量,这些子进程可能会存在竞争 cpu 资源的情况,反而导致效率的降低。

一般来说,我们建议根据系统中 cpu 的数量设置 worker 的值。 比如在一台 64 cpu8 gpu 的机器上,预期中每个 gpu 会对应 8 个 cpu, 那么我们在使用时对应的把 worker 数设置在 8 左右就是个不错的选择。

下面以一个加载图像分类数据的流程来举例说明如何创建一个加载数据的 pipeline

1、假设图像数据按照一定的规则放置于同一目录下(通常数据集主页会对目录组织和文件命名规则进行介绍)。 要创建对应的数据加载器,首先需要一个继承自 Dataset 的类。 我们可以创建一个自定义的数据集:

import cv2
import numpy as np
import megengine
from megengine.data.dataset import Dataset
​
class CustomImageDataset(Dataset):
    def __init__(self, image_folder):
        # get all mapping indice
        self.image_folder = image_folder
        self.image_list = os.listdir(image_folder)
​
    # get the sample
    def __getitem__(self, idx):
        # get the index
        image_file = self.image_list[idx]
​
        # get the data
        # in this case we load image data and convert to ndarray
        image = cv2.imread(self.image_folder + image_file, cv2.IMREAD_COLOR)
        image = np.array(image)
​
        # get the label
        # in this case the label was noted in the name of the image file
        # ie: 1_image_28457.png where 1 is the label
        # and the number at the end is just the id or something
        target = int(image_file.split("_")[0])
​
        return image, target
​
    def __len__(self):
        return len(self.images)

要获取示例图像,可以创建一个数据集对象,并将示例索引传递给__getitem__()方法, 然后将返回图像数组和对应的标签,例如:

dataset = CustomImageDataset("/path/to/image/folder")
data, sample = dataset.__getitem__(0) # dataset[0]

2、现在我们已经预先创建了能够返回一个样本及其标签的类CustomImageDataset, 但仅依赖Dataset本身还无法实现自动分批、乱序、并行等功能; 我们必须接着创建DataLoader, 它通过其它的参数配置项围绕这个类“包装”, 可以按照我们的要求从数据集类中返回整批样本。

from megengine.data.transform import ToMode
from megengine.data import DataLoader, RandomSampler
​
dataset = YourImageDataset("/path/to/image/folder")
​
# you can implement the function to randomly split your dataset
train_set, val_set, test_set = random_split(dataset)
​
# B is your batch-size, ie. 128
train_dataloader = DataLoader(train_set,
      sampler=RandomSampler(train_set, batch_size=B),
      transform=ToMode('CHW'),
)

3、现在可以加载数据并进行训练了:

for epoch in range(epochs):
​
    for images, targets in train_dataloder:
        # now 'images' is a batch containing B samples
        # and 'targets' is a batch containing B targets
        # (of the images in 'images' with the same index
​
        # remember to convert data to tensor
        images = megengine.Tensor(images)
        targets = megengine.Tensor(targets)
​
        # train function
        # ...

更多 MegEngine 信息获取,您可以:查看文档和 GitHub 项目,或加入 MegEngine 用户交流 QQ 群:1029741705。欢迎参与 MegEngine 社区贡献,成为 Awesome MegEngineer,荣誉证书、定制礼品享不停。