优雅地在Numpy实现Conv2D

发布时间 2023-08-31 20:01:35作者: DevilXXL

何为优雅:

  • 编写优雅——算法符合直觉,容易理解
  • 执行优雅——执行高效

本文将围绕这俩点逐步对Numpy下卷积算子的实现拨茧抽丝。在文章的开头先声明一下这篇文章数据格式规范。

  • 输入 [N, Cin, H, W]
  • 卷积核 [Cin, Kx, Ky, Cout]
  • 输出 [N, Cout, Hout, Wout]

声明

  • 本文中 Weight 和 Kernel 两词不做区分
  • 本文是在小学期过程中完成,时间比较匆忙,代码写得比较糙。重点看算法大体的结构,若要细看请前往 本文代码仓库

前向

使用循环是最直接的思路,但毫无疑问,这在Python下速度非常慢。不过不妨抛砖引玉,从最基础的开始看看

for idxn in range(self.input.shape[0]):
    for idxc in range(self.channel_out):
        for idxh in range(height_out):
            for idxw in range(width_out):
                self.output[idxn, idxc, idxh, idxw] = (
                    np.sum(
                        self.input_pad[
                            idxn,
                            :,
                            idxh * self.stride : idxh * self.stride
                            + self.kernel_size,
                            idxw * self.stride : idxw * self.stride
                            + self.kernel_size,
                        ]
                        * self.weight[..., idxc],
                        axis=(0, 1, 2),  # For Cin, Kernel_X, Kernel_Y
                    )
                    + self.bias[idxc]
                )

一共 4 层循环,对 Batch 和 输出维度 的遍历不用细究,我们关键看 Height 和 Width 的这俩层循环,也就是滑动窗口的实现

每次滑动窗口进行卷积运算,对当前窗口内的输入和权重按元素相乘(Hadamard Product),并相加。乘和加,自然联想到矩阵乘法。使用矩阵乘法完成 Conv2D ,便是 img2col 思路。

img2col 网上的介绍很多,这里不对细节展开。不过值得好好推敲为什么要这么做。

mat_w = int(self.kernel_size * self.kernel_size * self.channel_in)
mat_h = int(height_out * width_out)

self.col = np.empty((input.shape[0], mat_h, mat_w))
cur = 0
for x in range(height_out):
    for y in range(width_out):
        bias_x = x * self.stride
        bias_y = y * self.stride
        self.col[:, cur, :] = self.input_pad[:, :, bias_x: bias_x + self.kernel_size, bias_y: bias_y + self.kernel_size].reshape(input.shape[0], -1)
        cur = cur + 1

Q.为什么要把每个窗口内的元素展开?

这关乎到矩阵乘法,矩阵乘法是完整地遍历某一个维度,也就是说,如果k的范围是0~N-1,就是从 0 到 N-1 完完整整地遍历 N 次

\[\sum_k A_{ik}B_{kj} = C_{ij} \]

反观卷积,并没有完整遍历 Height 或 Width 维度

\[\sum_{k_h}\sum_{k_w} X_{h+k_h,w+k_w}W_{k_h,k_w}=Y_{h'w'} \]

所以将每个窗口的元素展开成一个维度,以完整地遍历

不止矩阵乘法,进一步推广到 Tensor Dot 也是完整遍历某个维度,这种方法思路可以借鉴在其他运算之中

Q.img2col 的循环可以去掉吗?

一般 Numpy 涉及维度变换,都是使用 reshape transpose 等操作,而我们的 img2col 仍用了两层循环完成。那么可以使用这些方法完成变换吗?

答案是不一定。

卷积是核按照 stride 进行滑动,这表明一个元素有可能被核“框中”多次。

也就是说 img2col 的映射并不是单一映射,在原先img 的索引体系下,一个元素只有一种索引比如 (H,W), 而在 col 的索引体系下,可能有多个索引指向同一个元素。这是卷积最特殊的地方。

反过来说,如果滑动窗口没有重叠的元素,就可以使用reshape 等方法完成 img2col 的维度变换,比如 kernel_size = 2, stride = 2 的 Maxpool 层。

不过,在 img2col 的俩层循环内,没有加减乘除的算术运算,只是简单地赋值。这样循环的开销也不会影响太大。多次运算->多次数据格式的转换+一次运算,这种思路也非常值得借鉴。

当然,也可以利用 Numpy 中为滑动窗口定制的方法完成该变换,如下。

self.stride_shape = (
            self.input_pad.shape[0],
            self.input_pad.shape[1],
            self.height_out,
            self.width_out,
            self.kernel_size,
            self.kernel_size,
        )
self.stride_strides = (
    *self.input_pad.strides[:-2],  # for N and C
    self.input_pad.strides[-2] * self.stride,  # for H out
    self.input_pad.strides[-1] * self.stride,  # for W out
    *self.input_pad.strides[-2:],  # for kernel size
)
input_strides = np.lib.stride_tricks.as_strided(
    self.input_pad, shape=self.stride_shape, strides=self.stride_strides
)

(注:一般 img2col 里,将 Kernel 的两个维度压缩成一个维度,而这里 as_strided 的例子中保留了二维维度。区别不大,不过更容易理解)

该代码使用了 np.lib.stride_tricks.as_strided 方法,将原有的两层循环用一行代码代替。代码中 shapestride 给学习 Numpy (乃至 Pytorch 等) 表示数据逻辑结构存储结构举了一个好例子。

也可以使用基于 as_stided 封装的 np.lib.stride_tricks.sliding_window_view 方法,原理是一样的,接口更加友好。

至于数据格式转换完成后,则按照矩阵乘法的逻辑来处理

self.output = (
    np.einsum("nchwxy,cxyo->nohw", input_strides, self.weight, optimize=True)
    + self.bias[np.newaxis, :, np.newaxis, np.newaxis]
)  # [N,Cout,H,W] + [1,Cout,1,1]

这里使用了爱因斯坦标记法 einsum。该方法在算法验证的时候非常好用,一是能够在更高的抽象层从数据维度的变换表示不同的算法,比如矩阵乘法、点乘、转置等等,二是在实现时用字母表示某一维度,相比 numpy 中 axis=(1,2,3) 这种写法可读性更高。

各种算法效率如下:

  • Layer Shape (3, 3, 16, 1, 1)
  • Input Shape (1, 3, 32, 32)
  • Forward 100 Times
Algorithm Consumption(s)
Loop 6.122
img2col 0.1444
as_strided + einsum 0.068
as_strided + einsum + optimize 0.0118
as_strided + tensordot 0.0113

einsum 开启 optimize 参数后,运行效率和直接调用 tensordot 在一个数量级。个人比较喜欢先用直观的 einsum 完成算法验证,再翻译成 tensordot、matmul、sum 等其他函数优化效率。

反向

同样的,先从最基础的开始:

for idxn in range(top_diff.shape[0]):  # N
    for idxc in range(top_diff.shape[1]):  # Cout
        for idxh in range(top_diff.shape[2]):  # H
            for idxw in range(top_diff.shape[3]):  # W
                self.d_weight[:, :, :, idxc] += (
                    top_diff[idxn, idxc, idxh, idxw]
                    * self.input_pad[
                        idxn,
                        :,
                        idxh * self.stride : idxh * self.stride
                        + self.kernel_size,
                        idxw * self.stride : idxw * self.stride
                        + self.kernel_size,
                    ]
                )  # [N, Co, H, W] [N, Ci, H, W] -> [Ci, Kh, Kw, Co]
                # top_diff 是传回来的梯度
                self.d_bias[idxc] += top_diff[idxn, idxc, idxh, idxw]
                bottom_diff[
                    idxn,
                    :,
                    idxh * self.stride : idxh * self.stride + self.kernel_size,
                    idxw * self.stride : idxw * self.stride + self.kernel_size,
                ] += (
                    top_diff[idxn, idxc, idxh, idxw]
                    * self.weight[:, :, :, idxc]
                )
bottom_diff = bottom_diff[
    :, :, self.padding : -self.padding, self.padding : -self.padding
]

(注:top_diff 是传回来的梯度,格式和输出相同 [N, Cout, Hout, Wout] )

反向需要求解对 \(X\), \(Weight\), \(Bias\) 三者的梯度。

反向求解和正向求解的难点一样在于如何实现 stride。

计算对 \(Weight\) 的梯度,需要使用 Y 和 X。我们可以利用前向传播时 stride 之后的 X 成果直接计算。

self.d_weight = np.einsum("nchwxy,nohw->cxyo", input_strides, top_diff,optimize=True)

\(Bias\) 格式比较简单,不展开讨论。

self.d_bias = np.einsum("nohw->o", top_diff,optimize=True)

img2col ,那么自然也有 col2img,但正如之前所说, img 内的元素在 col 内可以多次索引,一个 img 的元素对应多个 col 下的元素,反应在算法里,就是 +=

要知道这个过程在俩层循环内实现,赋值号变成 += 算术运算,会带来额外的开销。

bottom_diff = np.zeros(self.input_pad.shape)
for idxh in range(top_diff.shape[2]):  # H
    for idxw in range(top_diff.shape[3]):  # W
        bottom_diff[
            :,
            :,
            idxh * self.stride : idxh * self.stride + self.kernel_size,
            idxw * self.stride : idxw * self.stride + self.kernel_size,
        # ] += np.einsum("no,cxyo->ncxy",top_diff[:, :, idxh, idxw],self.weight, optimize=True)
        ] += np.tensordot(top_diff[:, :, idxh, idxw],self.weight, axes=[(1,),(3,)])

那么是类似的 as_strided 的方法完成呢?

思路1.对 X 的梯度进行 as_strided

注意到 np.lib.stride_tricks.as_strided 方法返回的是原数据的浅复制(或者说,View),对 strided 后的数据进行修改原数据也会变动,既然 strided 之后就是矩阵乘法,而矩阵乘法的梯度相比更加简单,我只要将梯度传回 strided 之后的变量,那么不就会映射到原变量吗?

很遗憾,这个方法是行不通的。虽然 as_strided 返回的是浅复制,但若在一个命令里对一个元素多次修改,不能正确地执行。也就是说,strided 之后的变量作为左值是不完善的。

思路2.对 Weight 的梯度进行 as_strided

运动是相对的,Kernel 相对输入图像滑动,那么也可以看作图像相对 Kernel 滑动。既然 X 的梯度 strided 之后不能作为左值,那我把 Kernel strided 作为右值不就好了吗?

pad_height = self.input_pad.shape[2]
pad_width = self.input_pad.shape[3]
pad_h = pad_height - self.kernel_size
pad_w = pad_width - self.kernel_size
pad_filt = np.pad(
    self.weight, ((0, 0),  (pad_h, pad_h), (pad_w, pad_w), (0, 0)), "constant"
)
sub_windows = np.lib.stride_tricks.as_strided(
    pad_filt,
    shape=(
        self.channel_in,
        self.height_out,
        self.width_out,
        pad_height,
        pad_width,
        self.channel_out,
    ),
    strides=(
        pad_filt.strides[0], # C
        pad_filt.strides[1]* self.stride, # A
        pad_filt.strides[2]* self.stride, # B
        pad_filt.strides[1], # H
        pad_filt.strides[2], # W
        pad_filt.strides[3], # O
    ),
)

bottom_diff = np.einsum("cabhwo,noab->nchw",sub_windows, top_diff[:,:,::-1,::-1], optimize=True)

准确性验证通过,但存在一个严重的问题。为了把让图像在 Kernel 上滑动,需要先对 Kernel 用 0 进行 padding。对于某一个通道对 \((C_{in},C_{out})\) 下,补全的数据量级在 \(\mathcal{O}(K*H)\) ,考虑所有通道组合,则是 \(\mathcal{O}(C_{in}*C_{out}*K*H)\) 。在卷积网络内部常常有着非常大的输入和输出通道数量(比如 Vgg 中的 512x512),这会带来严重的内存占用导致程序报错。

思路3.对返回的梯度进行 as_strided

观察上个思路执行运算的这一行代码

bottom_diff = np.einsum("cabhwo,noab->nchw",sub_windows, top_diff[:,:,::-1,::-1], optimize=True)

输入是 Kernel 和 后层返回的梯度 \(Y_{grad}\),得到输入 \(X\) 的梯度 \(X_{grad}\)。对 Kernel strided 之后补全了 \(X\) 的两个维度,那么对\(Y_{grad}\) strided 是否也可以达到目的呢?并且 \(Y_{grad}\) 只有输出通道,在内存开销上会比思路2小一个量级。

尚未测试。

各种算法效率如下:

  • Layer Shape (3, 512, 512, 1, 1)
  • Input Shape (1, 512, 32, 32)
  • Backward 1 Times
Algorithm Consumption(s)
4-Loop(N,C,H,W) 71.7204
2-Loop(H,W) + einsum + optimize 4.4878
2-Loop(H,W) + tensordot 1.9864

经验收获

  • Numpy
    • as_strided 不建议当作左值
    • Einstein 标记法符合思维方式,先用 einsum 验证算法,再翻译成 sumtensordot 优化效率
  • 算法优化
    • Python 代码速度优化,减少 loop 循环
    • 如果 loop 循环不好再减少了,将运算操作分离出 loop
    • 在 Numpy 里实现神经网络算子的意义并不是很大,一是在各种框架里都有封装好的算子供使用,二是要自定义算子、优化算子,也应该选择其他语言作为后端。这还涉及到编译等过程,软硬件配合的优化才是最厉害的。