TVM Tensor Expression

发布时间 2023-07-12 11:22:58作者: Jareth

使用张量表达式

我们将注意力转向如何使用张量表达式(Tensor Expression)(TE)来定义张量的计算,并应用循环优化。TE用pure的函数语言(每个表达式没有副作用),定义了张量的计算。整体上查看TVM的上下文时,Relay用一系列算子的集合描述了一个计算,其中每个算子都可以使用TE进行表示,每个TE的输入是tensor,经过计算后输出tensor。

TVM使用域指定特定张量表达式到高效kernel的构建。使用两个例子演示使用张量表达式语言基本的工作流程。第一个例子介绍了TE和调度向量加法,第二个例子使用TE逐步优化矩阵乘法,这个矩阵乘法将作为未来涵盖更多特性的TVM的basis进行比较。

例子 1,用TE定义和调度向量加法(CPU)

本例子将使用一个TE来实现向量加法,然后调度到目标CPU上,首先初始化TVM环境

import tvm
import tvm.testing
from tvm import te
import numpy as np

如果你能确认CPU的信息,可以得到更好的性能。如果使用LLVM,可以通过命令llc --version查看CPU型号,还可以查看/proc/cpuinfo检查处理器能支持的扩展。例如,你可以使用llvm -mcpu=skylake-avx512,针对AVX-512指令的CPUs

tgt = tvm.target.Target(target="llvm", host="llvm")

描述向量计算

TVM适配张量语义,中间结果被表示为多维数组,使用者需要描述张量的计算规则。首先定义符号变量n来表示形状,然后定义两个占位符张量A和B,使用给定的形状n。然后定义结果张量C,使用compute算子进行计算。compute定义了一种计算方式,制定了输出张量的形状,对张量中每个位置的元素执行lambda函数。注意到n定义了A、B和C统一的形状。这一阶段并不会产生实际的计算,我们只是声明了如何进行计算。

n = te.var("n")
A = te.placeholder((n,), name="A")
B = te.placeholder((n,), name="B")
C = te.compute(A.shape, lambda i: A[i] + B[i], name="C")

构建计算的默认调度

上面的代码描述了计算规则,但计算C的过程可以有多种不同的方式,在不同设备上表现也不一样。对于一个多维的张量,可以选择那一个轴首先被迭代,或将计算分在不同的线程上。TVM需要用户提供一种调度方案,描述计算该怎样被执行。TE的调度操作可能会改变循环的顺序,不同线程的工作划分,以及这些算子数据的块组,调度的重要概念是他们只描述计算是如何执行的,所以相同的TE,不同的调度也会产生相同的结果。

TVM允许你创建一个简单的通过行主序的方式计算C

for (int i = 0; i < n; ++i) {
  C[i] = A[i] + B[i];
}
s = te.create_schedule(C.op)

编译和评估默认的调度

有了TE和schedule,我们可以针对目标语言和体系结构创建可运行的代码,这个例子中是LLVM和CPU。首先提供给TVM调度,一个列表,包含了调度的张量表达式,目标设备和主机,创建函数的名字,输出的结果是类型擦除的函数,可以直接从python调用。

下面的代码我们使用tvm.build来创建函数,创建函数将调度和函数签名(输入和输出),以及编译的目标语言作为输入

fadd = tvm.build(s, [A, B, C], tgt, name="myadd")

运行这个函数,与numpy中相同的计算进行比较。编译的TVM函数暴露了一个简洁的C API,可以从任何语言进行调用。我们从创建设备开始,然后在设备上初始化张量,执行自定义的加法算子。为了检验计算是否正确,我们与numpy比较输出张量的结果。

dev = tvm.device(tgt.kind.name, 0)

n = 1024
a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev)
b = tvm.nd.array(np.random.uniform(size=n).astype(B.dtype), dev)
c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev)
fadd(a, b, c)
tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

为了比较这个版本和numpy的运行速度,创建一个辅助函数来分析TVM创建的代码的性能

import timeit

np_repeat = 100
np_running_time = timeit.timeit(
    setup="import numpy\n"
    "n = 32768\n"
    'dtype = "float32"\n'
    "a = numpy.random.rand(n, 1).astype(dtype)\n"
    "b = numpy.random.rand(n, 1).astype(dtype)\n",
    stmt="answer = a + b",
    number=np_repeat,
)
print("Numpy running time: %f" % (np_running_time / np_repeat))


def evaluate_addition(func, target, optimization, log):
    dev = tvm.device(target.kind.name, 0)
    n = 32768
    a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev)
    b = tvm.nd.array(np.random.uniform(size=n).astype(B.dtype), dev)
    c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev)

    evaluator = func.time_evaluator(func.entry_name, dev, number=10)
    mean_time = evaluator(a, b, c).mean
    print("%s: %f" % (optimization, mean_time))

    log.append((optimization, mean_time))


log = [("numpy", np_running_time / np_repeat)]
evaluate_addition(fadd, tgt, "naive", log=log)

输出

Numpy running time: 0.000008
naive: 0.000008

使用并行调度schedule

已经阐释了TE的基本知识,现在进一步研究schedule如何使用,以及如何优化张量表达式到不同的体系结构上。一个schedule是一系列的步骤,将一个表达式转换成多种不同的方式。

schedule应用在TE上时,输入和输出不变,但编译时表达式的实现可能改变,目前张量加法时默认的串行执行,但是很容易使用不同处理器来并行化。

s[C].parallel(C.op.axis[0])

tvm.lower指令将会创建TE的IR,对应schedule,通过lower这些表达式,可以查看schedule的效果,我们使用flag simple_mode=True来返回可读的C-style语句

print(tvm.lower(s, [A, B, C], simple_mode=True))
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.handle, B: T.handle, C: T.handle):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        n = T.int32()
        A_1 = T.match_buffer(A, (n,), strides=("stride",), buffer_type="auto")
        B_1 = T.match_buffer(B, (n,), strides=("stride",), buffer_type="auto")
        C_1 = T.match_buffer(C, (n,), strides=("stride",), buffer_type="auto")
        for i in T.parallel(n):
            C_2 = T.Buffer((C_1.strides[0] * n,), data=C_1.data, buffer_type="auto")
            A_2 = T.Buffer((A_1.strides[0] * n,), data=A_1.data, buffer_type="auto")
            B_2 = T.Buffer((B_1.strides[0] * n,), data=B_1.data, buffer_type="auto")
            C_2[i * C_1.strides[0]] = A_2[i * A_1.strides[0]] + B_2[i * B_1.strides[0]]

现在TVM可以将这些block运行在不同的线程上,现在编译和运行这个新的schedule使用并行计算

fadd_parallel = tvm.build(s, [A, B, C], tgt, name="myadd_parallel")
fadd_parallel(a, b, c)

tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

evaluate_addition(fadd_parallel, tgt, "parallel", log=log)
parallel: 0.000009

使用向量化schedule

现代CPU有能力执行SIMD计算,我们可以应用另一个schedule来利用这一点。首先将schedule划分为inner和outer循环,使用split调度原语,内部循环可以使用向量化,利用SIMD指令,外层循环可以并行化,通过parallel调度原语,根据CPU选择split的因子

# Recreate the schedule, since we modified it with the parallel operation in
# the previous example
n = te.var("n")
A = te.placeholder((n,), name="A")
B = te.placeholder((n,), name="B")
C = te.compute(A.shape, lambda i: A[i] + B[i], name="C")

s = te.create_schedule(C.op)

# This factor should be chosen to match the number of threads appropriate for
# your CPU. This will vary depending on architecture, but a good rule is
# setting this factor to equal the number of available CPU cores.
factor = 4

outer, inner = s[C].split(C.op.axis[0], factor=factor)
s[C].parallel(outer)
s[C].vectorize(inner)

fadd_vector = tvm.build(s, [A, B, C], tgt, name="myadd_parallel")

evaluate_addition(fadd_vector, tgt, "vector", log=log)

print(tvm.lower(s, [A, B, C], simple_mode=True))
vector: 0.000038
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.handle, B: T.handle, C: T.handle):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        n = T.int32()
        A_1 = T.match_buffer(A, (n,), strides=("stride",), buffer_type="auto")
        B_1 = T.match_buffer(B, (n,), strides=("stride",), buffer_type="auto")
        C_1 = T.match_buffer(C, (n,), strides=("stride",), buffer_type="auto")
        for i_outer in T.parallel((n + 3) // 4):
            for i_inner_s in range(4):
                if T.likely(i_outer * 4 + i_inner_s < n):
                    C_2 = T.Buffer((C_1.strides[0] * n,), data=C_1.data, buffer_type="auto")
                    A_2 = T.Buffer((A_1.strides[0] * n,), data=A_1.data, buffer_type="auto")
                    B_2 = T.Buffer((B_1.strides[0] * n,), data=B_1.data, buffer_type="auto")
                    cse_var_1: T.int32 = i_outer * 4 + i_inner_s
                    C_2[cse_var_1 * C_1.strides[0]] = A_2[cse_var_1 * A_1.strides[0]] + B_2[cse_var_1 * B_1.strides[0]]

比较不同的schedule

现在可以比较不同的schedule

baseline = log[0][1]
print("%s\t%s\t%s" % ("Operator".rjust(20), "Timing".rjust(20), "Performance".rjust(20)))
for result in log:
    print(
        "%s\t%s\t%s"
        % (result[0].rjust(20), str(result[1]).rjust(20), str(result[1] / baseline).rjust(20))
    )

输出

Operator                  Timing             Performance
   numpy    7.522140003857203e-06                    1.0
   naive    6.446899999999999e-06     0.8570566350392518
parallel              8.8498e-06      1.1765003038313564
  vector    3.8041300000000005e-05     5.057244345424731

目标机器为GPU的向量加法

这个例子中,我们可以将向量加法编译到GPU上

# If you want to run this code, change ``run_cuda = True``
# Note that by default this example is not run in the docs CI.

run_cuda = False
if run_cuda:
    # Change this target to the correct backend for you gpu. For example: cuda (NVIDIA GPUs),
    # rocm (Radeon GPUS), OpenCL (opencl).
    tgt_gpu = tvm.target.Target(target="cuda", host="llvm")

    # Recreate the schedule
    n = te.var("n")
    A = te.placeholder((n,), name="A")
    B = te.placeholder((n,), name="B")
    C = te.compute(A.shape, lambda i: A[i] + B[i], name="C")
    print(type(C))

    s = te.create_schedule(C.op)

    bx, tx = s[C].split(C.op.axis[0], factor=64)

    ################################################################################
    # Finally we must bind the iteration axis bx and tx to threads in the GPU
    # compute grid. The naive schedule is not valid for GPUs, and these are
    # specific constructs that allow us to generate code that runs on a GPU.

    s[C].bind(bx, te.thread_axis("blockIdx.x"))
    s[C].bind(tx, te.thread_axis("threadIdx.x"))

    ######################################################################
    # Compilation
    # -----------
    # After we have finished specifying the schedule, we can compile it
    # into a TVM function. By default TVM compiles into a type-erased
    # function that can be directly called from the python side.
    #
    # In the following line, we use tvm.build to create a function.
    # The build function takes the schedule, the desired signature of the
    # function (including the inputs and outputs) as well as target language
    # we want to compile to.
    #
    # The result of compilation fadd is a GPU device function (if GPU is
    # involved) as well as a host wrapper that calls into the GPU
    # function. fadd is the generated host wrapper function, it contains
    # a reference to the generated device function internally.

    fadd = tvm.build(s, [A, B, C], target=tgt_gpu, name="myadd")

    ################################################################################
    # The compiled TVM function exposes a concise C API that can be invoked from
    # any language.
    #
    # We provide a minimal array API in python to aid quick testing and prototyping.
    # The array API is based on the `DLPack <https://github.com/dmlc/dlpack>`_ standard.
    #
    # - We first create a GPU device.
    # - Then tvm.nd.array copies the data to the GPU.
    # - ``fadd`` runs the actual computation
    # - ``numpy()`` copies the GPU array back to the CPU (so we can verify correctness).
    #
    # Note that copying the data to and from the memory on the GPU is a required step.

    dev = tvm.device(tgt_gpu.kind.name, 0)

    n = 1024
    a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev)
    b = tvm.nd.array(np.random.uniform(size=n).astype(B.dtype), dev)
    c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev)
    fadd(a, b, c)
    tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

    ################################################################################
    # Inspect the Generated GPU Code
    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    # You can inspect the generated code in TVM. The result of tvm.build is a TVM
    # Module. fadd is the host module that contains the host wrapper, it also
    # contains a device module for the CUDA (GPU) function.
    #
    # The following code fetches the device module and prints the content code.

    if (
        tgt_gpu.kind.name == "cuda"
        or tgt_gpu.kind.name == "rocm"
        or tgt_gpu.kind.name.startswith("opencl")
    ):
        dev_module = fadd.imported_modules[0]
        print("-----GPU code-----")
        print(dev_module.get_source())
    else:
        print(fadd.get_source())

保存和加载编译的模块

除了在运行时编译,还可以保存编译好的模块到文件,然后在需要时加载进来
下面的代码首先完成了以下几个步骤

  • 保存了编译好的host模块到object文件
  • 保存device模块到ptx文件
  • cc.create_shared调用编译器(gcc)创建了一个共享库
from tvm.contrib import cc
from tvm.contrib import utils

temp = utils.tempdir()
fadd.save(temp.relpath("myadd.o"))
if tgt.kind.name == "cuda":
    fadd.imported_modules[0].save(temp.relpath("myadd.ptx"))
if tgt.kind.name == "rocm":
    fadd.imported_modules[0].save(temp.relpath("myadd.hsaco"))
if tgt.kind.name.startswith("opencl"):
    fadd.imported_modules[0].save(temp.relpath("myadd.cl"))
cc.create_shared(temp.relpath("myadd.so"), [temp.relpath("myadd.o")])
print(temp.listdir())

输出

['myadd.so', 'myadd.o']

CPU(host)模块之间被保存为so的共享库形式。device code有多种保存格式,我们的例子中使用ptx进行存储

加载编译好的模块

我们可以从文件系统中加载编译好的模块并运行代码,一下代码分别加载了host和device的代码,并将他们link在一起,可以验证新加载的函数

fadd1 = tvm.runtime.load_module(temp.relpath("myadd.so"))
if tgt.kind.name == "cuda":
    fadd1_dev = tvm.runtime.load_module(temp.relpath("myadd.ptx"))
    fadd1.import_module(fadd1_dev)

if tgt.kind.name == "rocm":
    fadd1_dev = tvm.runtime.load_module(temp.relpath("myadd.hsaco"))
    fadd1.import_module(fadd1_dev)

if tgt.kind.name.startswith("opencl"):
    fadd1_dev = tvm.runtime.load_module(temp.relpath("myadd.cl"))
    fadd1.import_module(fadd1_dev)

fadd1(a, b, c)
tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

打包至一个库中

在上面的例子中,我们分开存储了device和host的代码。TVM还支持将所有代码导出至一个共享库中。目前支持Metal,OpenCL和CUDA模块

fadd.export_library(temp.relpath("myadd_pack.so"))
fadd2 = tvm.runtime.load_module(temp.relpath("myadd_pack.so"))
fadd2(a, b, c)
tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

编译好的TVM模块不依赖TVM编译器,只依赖运行时库,TVM runtime封装了device driver,并提供了线程安全的的调用。

创建OpenCL代码

TVM提供代码生成器,支持多种后端,我们还可以生成OpenCL代码或LLVM代码,在CPU后端运行。

下面的代码块生成OpenCL代码,在OpenCL设备上创建数组,验证代码的正确性。

if tgt.kind.name.startswith("opencl"):
    fadd_cl = tvm.build(s, [A, B, C], tgt, name="myadd")
    print("------opencl code------")
    print(fadd_cl.imported_modules[0].get_source())
    dev = tvm.cl(0)
    n = 1024
    a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev)
    b = tvm.nd.array(np.random.uniform(size=n).astype(B.dtype), dev)
    c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev)
    fadd_cl(a, b, c)
    tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

TVM包含多种调度原语,详细 https://tvm.apache.org/docs/how_to/work_with_schedules/schedule_primitives.html#schedule-primitives

  • split:设定因子,将一个轴划分为两个轴
  • tile:设定因子,将计算分在两个轴上
  • fuse:融合一个计算中两个连续的轴
  • reorder:重排序轴的计算到指定顺序
  • bind:GPU编程中,绑定一个计算到指定线程
  • compute_at:
  • compute_inline:
  • compute_root:

例子2,用TE手工优化矩阵乘

考虑一个更复杂的例子,如何使用18行tvm的python代码加速矩阵乘18倍。矩阵乘法是一个计算密集型算子,有两种重要的方法进行优化CPU性能

  • 提高cache命中率,复杂的数值计算和热点内存访问能够通过提高cache命中率的方式加速。需要转变原始的访存模式来适应cache
  • SIMD(Single Instruction multi-data),也被称为向量处理单元。在每个周期中不是处理单个数据,而是处理小批量的数据。这需要转变循环体中的数据访问格式,统一模式,才能让LLVM后端生成SIMD的低级代码。

准备性能baseline

首先收集numpy的矩阵乘数据

import tvm
import tvm.testing
from tvm import te
import numpy

# The size of the matrix
# (M, K) x (K, N)
# You are free to try out different shapes, sometimes TVM optimization outperforms numpy with MKL.
M = 1024
K = 1024
N = 1024

# The default tensor data type in tvm
dtype = "float32"

# You will want to adjust the target to match any CPU vector extensions you
# might have. For example, if you're using using Intel AVX2 (Advanced Vector
# Extensions) ISA for SIMD, you can get the best performance by changing the
# following line to ``llvm -mcpu=core-avx2``, or specific type of CPU you use.
# Recall that you're using llvm, you can get this information from the command
# ``llc --version`` to get the CPU type, and you can check ``/proc/cpuinfo``
# for additional extensions that your processor might support.

target = tvm.target.Target(target="llvm", host="llvm")
dev = tvm.device(target.kind.name, 0)

# Random generated tensor for testing
a = tvm.nd.array(numpy.random.rand(M, K).astype(dtype), dev)
b = tvm.nd.array(numpy.random.rand(K, N).astype(dtype), dev)

# Repeatedly perform a matrix multiplication to get a performance baseline
# for the default numpy implementation
np_repeat = 100
import timeit
np_running_time = timeit.timeit(
    setup="import numpy\n"
    "M = " + str(M) + "\n"
    "K = " + str(K) + "\n"
    "N = " + str(N) + "\n"
    'dtype = "float32"\n'
    "a = numpy.random.rand(M, K).astype(dtype)\n"
    "b = numpy.random.rand(K, N).astype(dtype)\n",
    stmt="answer = numpy.dot(a, b)",
    number=np_repeat,
)
print("Numpy running time: %f" % (np_running_time / np_repeat))

answer = numpy.dot(a.numpy(), b.numpy())

输出

Numpy running time: 0.019306

现在通过TVM TE定义基本的矩阵乘,并测试是否和numpy的结果一致。使用辅助函数测试schedules的优化。

# TVM Matrix Multiplication using TE
k = te.reduce_axis((0, K), "k")
A = te.placeholder((M, K), name="A")
B = te.placeholder((K, N), name="B")
C = te.compute((M, N), lambda x, y: te.sum(A[x, k] * B[k, y], axis=k), name="C")

# Default schedule
s = te.create_schedule(C.op)
func = tvm.build(s, [A, B, C], target=target, name="mmult")

c = tvm.nd.array(numpy.zeros((M, N), dtype=dtype), dev)
func(a, b, c)
tvm.testing.assert_allclose(c.numpy(), answer, rtol=1e-5)


def evaluate_operation(s, vars, target, name, optimization, log):
    func = tvm.build(s, [A, B, C], target=target, name="mmult")
    assert func

    c = tvm.nd.array(numpy.zeros((M, N), dtype=dtype), dev)
    func(a, b, c)
    tvm.testing.assert_allclose(c.numpy(), answer, rtol=1e-5)

    evaluator = func.time_evaluator(func.entry_name, dev, number=10)
    mean_time = evaluator(a, b, c).mean
    print("%s: %f" % (optimization, mean_time))
    log.append((optimization, mean_time))


log = []

evaluate_operation(s, [A, B, C], target=target, name="mmult", optimization="none", log=log)

输出

none: 3.269222

查看算子的中间表示,注意到使用了3层嵌套循环。

print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        for x, y in T.grid(1024, 1024):
            C_1 = T.Buffer((1048576,), data=C.data)
            C_1[x * 1024 + y] = T.float32(0)
            for k in range(1024):
                cse_var_2: T.int32 = x * 1024
                cse_var_1: T.int32 = cse_var_2 + y
                A_1 = T.Buffer((1048576,), data=A.data)
                B_1 = T.Buffer((1048576,), data=B.data)
                C_1[cse_var_1] = C_1[cse_var_1] + A_1[cse_var_2 + k] * B_1[k * 1024 + y]

优化1:分块

一个提高cache命中率的方式是分块,访问块中的数据有更好的数据局部性。这里使用32作为block factor,意味着内存中的块有32*32*sizeof(float) area of memory,对应4KB的cache大小,和32KB的L1cache大小关联。

首先为C算子创建默认schedule,然后应用tile调度原语,定义block factor,返回结果从最外层到最内层,类似于向量[x_outer, y_outer, x_inner, y_inner],然后获取算子的规约轴,执行factor为4的分割,这个factor不会直接影响到刚刚的分块,但是使用向量化时十分有用。

现在算子已经被分块了,可以利用重排序将规约操作放到计算的最外层循环,保证块内的数据在cache中,这样就完成了schedule。

bn = 32

# Blocking by loop tiling
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
(k,) = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)

# Hoist reduction domain outside the blocking loop
s[C].reorder(xo, yo, ko, ki, xi, yi)

evaluate_operation(s, [A, B, C], target=target, name="mmult", optimization="blocking", log=log)

输出

blocking: 0.319070

通过将计算重排序,利用了cache,应该能在输出结果中发现很大的性能提升,现在输出中间表示,与原始对比。

print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        for x_outer, y_outer in T.grid(32, 32):
            C_1 = T.Buffer((1048576,), data=C.data)
            for x_inner_init, y_inner_init in T.grid(32, 32):
                C_1[x_outer * 32768 + x_inner_init * 1024 + y_outer * 32 + y_inner_init] = T.float32(0)
            for k_outer, k_inner, x_inner, y_inner in T.grid(256, 4, 32, 32):
                cse_var_3: T.int32 = y_outer * 32
                cse_var_2: T.int32 = x_outer * 32768 + x_inner * 1024
                cse_var_1: T.int32 = cse_var_2 + cse_var_3 + y_inner
                A_1 = T.Buffer((1048576,), data=A.data)
                B_1 = T.Buffer((1048576,), data=B.data)
                C_1[cse_var_1] = C_1[cse_var_1] + A_1[cse_var_2 + k_outer * 4 + k_inner] * B_1[k_outer * 4096 + k_inner * 1024 + cse_var_3 + y_inner]

优化2:向量化

另一个重要的优化方法时向量化,当访存模式是统一的,编译器能够检测这种模式,将连续的内存传给SIMD向量处理器。在TVM中,使用vectorize节后来提示编译器,利用这种硬件特性。

本例子中,使用内层行数据的向量化,因为在之前的优化中已经cache友好了。

# Apply the vectorization optimization
s[C].vectorize(yi)

evaluate_operation(s, [A, B, C], target=target, name="mmult", optimization="vectorization", log=log)

# The generalized IR after vectorization
print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

vectorization: 0.296516
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        for x_outer, y_outer in T.grid(32, 32):
            C_1 = T.Buffer((1048576,), data=C.data)
            for x_inner_init in range(32):
                C_1[x_outer * 32768 + x_inner_init * 1024 + y_outer * 32:x_outer * 32768 + x_inner_init * 1024 + y_outer * 32 + 32] = T.Broadcast(T.float32(0), 32)
            for k_outer, k_inner, x_inner in T.grid(256, 4, 32):
                cse_var_3: T.int32 = y_outer * 32
                cse_var_2: T.int32 = x_outer * 32768 + x_inner * 1024
                cse_var_1: T.int32 = cse_var_2 + cse_var_3
                A_1 = T.Buffer((1048576,), data=A.data)
                B_1 = T.Buffer((1048576,), data=B.data)
                C_1[cse_var_1:cse_var_1 + 32] = C_1[cse_var_1:cse_var_1 + 32] + T.Broadcast(A_1[cse_var_2 + k_outer * 4 + k_inner], 32) * B_1[k_outer * 4096 + k_inner * 1024 + cse_var_3:k_outer * 4096 + k_inner * 1024 + cse_var_3 + 32]

优化3:循环排序

观察上面的IR,内层循环的行数据已经向量化,B也被转换为PackedB,遍历PackedB是串行的,观察A的访存模式,目前的schedule中,A是一列一列访问的,cache不优化,如果改变ki和内层轴xi的顺序,能够改善A矩阵的cache效率。

s = te.create_schedule(C.op)
xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
(k,) = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)

# re-ordering
s[C].reorder(xo, yo, ko, xi, ki, yi)
s[C].vectorize(yi)

evaluate_operation(
    s, [A, B, C], target=target, name="mmult", optimization="loop permutation", log=log
)

# Again, print the new generalized IR
print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

loop permutation: 0.116673
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        for x_outer, y_outer in T.grid(32, 32):
            C_1 = T.Buffer((1048576,), data=C.data)
            for x_inner_init in range(32):
                C_1[x_outer * 32768 + x_inner_init * 1024 + y_outer * 32:x_outer * 32768 + x_inner_init * 1024 + y_outer * 32 + 32] = T.Broadcast(T.float32(0), 32)
            for k_outer, x_inner, k_inner in T.grid(256, 32, 4):
                cse_var_3: T.int32 = y_outer * 32
                cse_var_2: T.int32 = x_outer * 32768 + x_inner * 1024
                cse_var_1: T.int32 = cse_var_2 + cse_var_3
                A_1 = T.Buffer((1048576,), data=A.data)
                B_1 = T.Buffer((1048576,), data=B.data)
                C_1[cse_var_1:cse_var_1 + 32] = C_1[cse_var_1:cse_var_1 + 32] + T.Broadcast(A_1[cse_var_2 + k_outer * 4 + k_inner], 32) * B_1[k_outer * 4096 + k_inner * 1024 + cse_var_3:k_outer * 4096 + k_inner * 1024 + cse_var_3 + 32]

优化4:数组包装

另一个优化技巧是array packing,将数组的存储维度转换为按照某个维度连续。
image
正如上图所示,在将计算分块之后,B的访存模式是规则的,但不连续。需要进行一些转换来得到连续的模式,通过将[16][16]数组重排序为[16/4][16][4]数组,从打包数组中取B的对应的值将是连续的。

为了实现这一点,将重新从默认schedule开始,考虑B的打包方式,TE是一个强力的语言,能够书写算子,但是需要提供相关的只是,例如潜在的算法,数据结构和硬件信息。之后可以通过一些操作让TVM来完成这些工作,现在首先进行新的优化schedule

# We have to re-write the algorithm slightly.
packedB = te.compute((N / bn, K, bn), lambda x, y, z: B[y, x * bn + z], name="packedB")
C = te.compute(
    (M, N),
    lambda x, y: te.sum(A[x, k] * packedB[y // bn, k, tvm.tir.indexmod(y, bn)], axis=k),
    name="C",
)

s = te.create_schedule(C.op)

xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)
(k,) = s[C].op.reduce_axis
ko, ki = s[C].split(k, factor=4)

s[C].reorder(xo, yo, ko, xi, ki, yi)
s[C].vectorize(yi)

x, y, z = s[packedB].op.axis
s[packedB].vectorize(z)
s[packedB].parallel(x)

evaluate_operation(s, [A, B, C], target=target, name="mmult", optimization="array packing", log=log)

# Here is the generated IR after array packing.
print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

array packing: 0.107548
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        packedB = T.allocate([32768], "float32x32", "global")
        packedB_1 = T.Buffer((32768,), "float32x32", data=packedB)
        for x in T.parallel(32):
            for y in range(1024):
                B_1 = T.Buffer((1048576,), data=B.data)
                packedB_1[x * 1024 + y] = B_1[y * 1024 + x * 32:y * 1024 + x * 32 + 32]
        for x_outer, y_outer in T.grid(32, 32):
            C_1 = T.Buffer((1048576,), data=C.data)
            for x_inner_init in range(32):
                C_1[x_outer * 32768 + x_inner_init * 1024 + y_outer * 32:x_outer * 32768 + x_inner_init * 1024 + y_outer * 32 + 32] = T.Broadcast(T.float32(0), 32)
            for k_outer, x_inner, k_inner in T.grid(256, 32, 4):
                cse_var_3: T.int32 = x_outer * 32768 + x_inner * 1024
                cse_var_2: T.int32 = k_outer * 4
                cse_var_1: T.int32 = cse_var_3 + y_outer * 32
                A_1 = T.Buffer((1048576,), data=A.data)
                C_1[cse_var_1:cse_var_1 + 32] = C_1[cse_var_1:cse_var_1 + 32] + T.Broadcast(A_1[cse_var_3 + cse_var_2 + k_inner], 32) * packedB_1[y_outer * 1024 + cse_var_2 + k_inner]

优化5:通过caching优化block writing

上面优化的点主要聚焦在高效访存和A、B矩阵计算优化上,经过分块优化,算子将一块一块地写入C的结果,这种访存模式不是顺序的,可以通过使用一个顺序的cache数组,组合cache_write,compute_at和unroll来存储块的结果,然后所有的结果计算完成后,写入C

s = te.create_schedule(C.op)

# Allocate write cache
CC = s.cache_write(C, "global")

xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn)

# Write cache is computed at yo
s[CC].compute_at(s[C], yo)

# New inner axes
xc, yc = s[CC].op.axis

(k,) = s[CC].op.reduce_axis
ko, ki = s[CC].split(k, factor=4)
s[CC].reorder(ko, xc, ki, yc)
s[CC].unroll(ki)
s[CC].vectorize(yc)

x, y, z = s[packedB].op.axis
s[packedB].vectorize(z)
s[packedB].parallel(x)

evaluate_operation(s, [A, B, C], target=target, name="mmult", optimization="block caching", log=log)

# Here is the generated IR after write cache blocking.
print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

block caching: 0.111746
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        packedB = T.allocate([32768], "float32x32", "global")
        C_global = T.allocate([1024], "float32", "global")
        packedB_1 = T.Buffer((32768,), "float32x32", data=packedB)
        for x in T.parallel(32):
            for y in range(1024):
                B_1 = T.Buffer((1048576,), data=B.data)
                packedB_1[x * 1024 + y] = B_1[y * 1024 + x * 32:y * 1024 + x * 32 + 32]
        for x_outer, y_outer in T.grid(32, 32):
            C_global_1 = T.Buffer((1024,), data=C_global)
            for x_c_init in range(32):
                C_global_1[x_c_init * 32:x_c_init * 32 + 32] = T.Broadcast(T.float32(0), 32)
            for k_outer, x_c in T.grid(256, 32):
                cse_var_4: T.int32 = k_outer * 4
                cse_var_3: T.int32 = x_c * 32
                cse_var_2: T.int32 = y_outer * 1024 + cse_var_4
                cse_var_1: T.int32 = x_outer * 32768 + x_c * 1024 + cse_var_4
                A_1 = T.Buffer((1048576,), data=A.data)
                C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1], 32) * packedB_1[cse_var_2]
                C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1 + 1], 32) * packedB_1[cse_var_2 + 1]
                C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1 + 2], 32) * packedB_1[cse_var_2 + 2]
                C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1 + 3], 32) * packedB_1[cse_var_2 + 3]
            for x_inner, y_inner in T.grid(32, 32):
                C_1 = T.Buffer((1048576,), data=C.data)
                C_1[x_outer * 32768 + x_inner * 1024 + y_outer * 32 + y_inner] = C_global_1[x_inner * 32 + y_inner]

优化6:并行化

目前,计算都是根据单核设计的,现代处理器通常有多个核,可以使用并行计算加速。

# parallel
s[C].parallel(xo)

x, y, z = s[packedB].op.axis
s[packedB].vectorize(z)
s[packedB].parallel(x)

evaluate_operation(
    s, [A, B, C], target=target, name="mmult", optimization="parallelization", log=log
)

# Here is the generated IR after parallelization.
print(tvm.lower(s, [A, B, C], simple_mode=True))

输出

parallelization: 0.133342
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.Buffer((1024, 1024), "float32"), B: T.Buffer((1024, 1024), "float32"), C: T.Buffer((1024, 1024), "float32")):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "global_symbol": "main", "tir.noalias": T.bool(True)})
        packedB = T.allocate([32768], "float32x32", "global")
        packedB_1 = T.Buffer((32768,), "float32x32", data=packedB)
        for x in T.parallel(32):
            for y in range(1024):
                B_1 = T.Buffer((1048576,), data=B.data)
                packedB_1[x * 1024 + y] = B_1[y * 1024 + x * 32:y * 1024 + x * 32 + 32]
        for x_outer in T.parallel(32):
            C_global = T.allocate([1024], "float32", "global")
            for y_outer in range(32):
                C_global_1 = T.Buffer((1024,), data=C_global)
                for x_c_init in range(32):
                    C_global_1[x_c_init * 32:x_c_init * 32 + 32] = T.Broadcast(T.float32(0), 32)
                for k_outer, x_c in T.grid(256, 32):
                    cse_var_4: T.int32 = k_outer * 4
                    cse_var_3: T.int32 = x_c * 32
                    cse_var_2: T.int32 = y_outer * 1024 + cse_var_4
                    cse_var_1: T.int32 = x_outer * 32768 + x_c * 1024 + cse_var_4
                    A_1 = T.Buffer((1048576,), data=A.data)
                    C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1], 32) * packedB_1[cse_var_2]
                    C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1 + 1], 32) * packedB_1[cse_var_2 + 1]
                    C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1 + 2], 32) * packedB_1[cse_var_2 + 2]
                    C_global_1[cse_var_3:cse_var_3 + 32] = C_global_1[cse_var_3:cse_var_3 + 32] + T.Broadcast(A_1[cse_var_1 + 3], 32) * packedB_1[cse_var_2 + 3]
                for x_inner, y_inner in T.grid(32, 32):
                    C_1 = T.Buffer((1048576,), data=C.data)
                    C_1[x_outer * 32768 + x_inner * 1024 + y_outer * 32 + y_inner] = C_global_1[x_inner * 32 + y_inner]

矩阵乘例子总结

经过上面简单的18行代码优化后,生成的代码能够接近使用Math Kernel Library(MKL)的numpy性能,现在打印结果进行比较

baseline = log[0][1]
print("%s\t%s\t%s" % ("Operator".rjust(20), "Timing".rjust(20), "Performance".rjust(20)))
for result in log:
    print(
        "%s\t%s\t%s"
        % (result[0].rjust(20), str(result[1]).rjust(20), str(result[1] / baseline).rjust(20))
    )

输出

Operator                  Timing             Performance
            none      3.4304152927000002                     1.0
        blocking            0.2998606907     0.08741235830487061
   vectorization            0.2868650104     0.08362398891191253
loop permutation            0.1205437718      0.0351397021977251
   array packing     0.10754811140000001    0.031351338605813926
   block caching     0.11174648409999999    0.032575205788581627
 parallelization     0.13334208120000002     0.03887053602045062

总结

正如之前所说,如何使用TE和scheduling primitives进行优化需要体系结构和算法的知识,但是TE能够表达更复杂的算法,进行自动优化。通过上面的知识,可以开始研究TVM如何自动进行schedule的优化步骤。

本例子中描述了TVM Tensor Expression(TE)的工作流程,使用了向量加法和矩阵乘法例子

  • 利用一系列操作定义计算
  • 描述需要使用的schedule primitives
  • 编译为目标函数
  • 可选,保存函数便于之后的加载

之后的例子将在矩阵乘的基础上扩展,展示如何构建通用的矩阵乘模板,以及其他算子,使用可调优的参数对特定平台进行自动调优。