Unreliable Guide To Locking 【ChatGPT】

发布时间 2023-12-09 15:08:16作者: 摩斯电码

Rusty's Remarkably Unreliable Guide to Kernel Locking

作者

Rusty Russell

简介

欢迎阅读 Rusty's Remarkably Unreliable Guide to Kernel Locking issues。本文档描述了 Linux Kernel 2.6 中的锁定系统。

随着 HyperThreading 和 Linux Kernel 中的抢占式调度的广泛应用,每个在内核上进行开发的人都需要了解多处理器并发和锁定的基本原理。

并发的问题

在一个正常的程序中,你可以像这样增加一个计数器:

very_important_count++;

这是人们期望发生的事情:

预期结果

实例 1 实例 2
读取 very_important_count (5)
增加 1 (6)
写入 very_important_count (6)
读取 very_important_count (5)
增加 1 (6)
写入 very_important_count (6)

这是可能发生的情况:

可能结果

实例 1 实例 2
读取 very_important_count (5)
读取 very_important_count (5)
增加 1 (6)
增加 1 (6)
写入 very_important_count (6)
写入 very_important_count (6)

竞争条件和临界区

这种重叠,其中结果取决于多个任务的相对时间,称为竞争条件。包含并发问题的代码片段称为临界区。特别是自从 Linux 开始在 SMP 机器上运行以来,它们成为内核设计和实现的主要问题之一。

抢占也可能产生相同的效果,即使只有一个 CPU:通过在临界区期间抢占一个任务,我们也会产生相同的竞争条件。在这种情况下,抢占的线程可能会自己运行临界区。

解决方案是识别这些同时访问发生的时刻,并使用锁定来确保只有一个实例可以同时进入临界区。Linux 内核中有许多友好的原语来帮助你做到这一点。然后还有一些不友好的原语,但我会假装它们不存在。

Linux Kernel 中的锁定

如果我能给你一个关于锁定的建议:保持简单。

不愿意引入新的锁。

两种主要类型的内核锁:自旋锁和互斥锁

有两种主要类型的内核锁。基本类型是自旋锁(include/asm/spinlock.h),它是一个非常简单的单持有者锁:如果无法获取自旋锁,你会一直尝试(自旋)直到成功。自旋锁非常小且快速,可以在任何地方使用。

第二种类型是互斥锁(include/linux/mutex.h):它类似于自旋锁,但你可能会在持有互斥锁时阻塞。如果无法锁定互斥锁,你的任务将挂起自己,并在释放互斥锁时唤醒。这意味着 CPU 在等待时可以做其他事情。有许多情况下你无法休眠(参见哪些函数可以安全地从中断中调用?),因此必须使用自旋锁。

这两种类型的锁都不是递归的:参见死锁:简单和高级。

在单处理器内核中的锁定

对于没有启用 CONFIG_SMP 和 CONFIG_PREEMPT 的内核,根本不存在自旋锁。这是一个很好的设计决定:当没有其他任务可以同时运行时,就没有理由使用锁。

如果内核没有启用 CONFIG_SMP,但启用了 CONFIG_PREEMPT,则自旋锁简单地禁用了抢占,这足以防止任何竞争。对于大多数情况,我们可以将抢占视为等同于 SMP,并不单独担心它。

即使没有 SMP 测试盒,你也应该始终测试启用了 CONFIG_SMP 和 CONFIG_PREEMPT 的锁定代码,因为它仍然可以捕获一些锁定错误。

互斥锁仍然存在,因为它们在用户上下文之间需要同步,如下面将要看到的。

仅在用户上下文中进行锁定

如果有一个数据结构只能从用户上下文访问,那么你可以使用简单的互斥锁(include/linux/mutex.h)来保护它。这是最简单的情况:你初始化互斥锁。然后你可以调用 mutex_lock_interruptible() 来获取互斥锁,调用 mutex_unlock() 来释放它。还有一个 mutex_lock(),应该避免使用,因为如果接收到信号,它将不会返回。

例如:net/netfilter/nf_sockopt.c 允许注册新的 setsockopt() 和 getsockopt() 调用,使用 nf_register_sockopt()。注册和注销仅在模块加载和卸载时进行(以及启动时间,在那里没有并发),并且注册列表仅在未知的 setsockopt() 或 getsockopt() 系统调用时进行查询。nf_sockopt_mutex 对于保护这一点是完美的,特别是因为 setsockopt 和 getsockopt 调用很可能会休眠。

在用户上下文和软中断之间进行锁定

如果软中断与用户上下文共享数据,你会遇到两个问题。首先,当前用户上下文可以被软中断中断,其次,临界区可能会被另一个 CPU 进入。这就是 spin_lock_bh()(include/linux/spinlock.h)的用法。它在该 CPU 上禁用软中断,然后获取锁。spin_unlock_bh() 则相反。('_bh' 后缀是对“Bottom Halves”的历史参考,这是软中断的旧名称。在一个完美的世界中,它应该被称为 spin_lock_softirq())。

请注意,你也可以在这里使用 spin_lock_irq() 或 spin_lock_irqsave(),它们也会停止硬件中断:参见硬中断上下文。

这对于单处理器同样有效:自旋锁消失了,这个宏简单地变成了 local_bh_disable()(include/linux/interrupt.h),它保护你免受软中断的影响。

在用户上下文和任务队列之间进行锁定

这与上面完全相同,因为任务队列实际上是从软中断中运行的。

在用户上下文和定时器之间进行锁定

这也与上面完全相同,因为定时器实际上是从软中断中运行的。从锁定的角度来看,任务队列和定时器是相同的。

在任务队列/定时器之间进行锁定

有时一个任务队列或定时器可能希望与另一个任务队列或定时器共享数据。

相同的任务队列/定时器

由于任务队列永远不会同时在两个 CPU 上运行,因此你不需要担心你的任务队列是可重入的(在 SMP 上同时运行两次),即使在 SMP 上也是如此。

不同的任务队列/定时器

如果另一个任务队列/定时器希望与你的任务队列或定时器共享数据,你们两个都需要使用 spin_lock() 和 spin_unlock() 调用。在这里 spin_lock_bh() 是不必要的,因为你已经在一个任务队列中,并且不会在同一个 CPU 上运行。

在软中断之间进行锁定

通常一个软中断可能希望与自身或任务队列/定时器共享数据。

相同的软中断

相同的软中断可以在其他 CPU 上运行:你可以使用一个每 CPU 数组(参见每 CPU 数据)以获得更好的性能。如果你要使用软中断,你可能会关心可扩展性性能足够,以证明额外的复杂性是合理的。

你需要为共享数据使用 spin_lock() 和 spin_unlock()。

不同的软中断

你需要为共享数据使用 spin_lock() 和 spin_unlock(),无论是定时器、任务队列、不同的软中断还是相同或另一个软中断:它们中的任何一个都可能在不同的 CPU 上运行。

硬中断上下文

硬件中断通常与任务队列或软中断通信。通常这涉及将工作放入队列,软中断将从中取出。

在硬中断和软中断/任务队列之间进行锁定

如果硬件中断处理程序与软中断共享数据,你有两个问题。首先,软中断处理可能会被硬件中断中断,其次,临界区可能会被另一个 CPU 的硬件中断进入。这就是 spin_lock_irq() 的用法。它被定义为在该 CPU 上禁用中断,然后获取锁。spin_unlock_irq() 则相反。

硬件中断处理程序不需要使用 spin_lock_irq(),因为软中断在硬件中断处理程序运行时无法运行:它可以使用 spin_lock(),这略快一些。唯一的例外是如果另一个硬件中断处理程序使用相同的锁:spin_lock_irq() 将阻止它中断我们。

这对于单处理器同样有效:自旋锁消失了,这个宏简单地变成了 local_irq_disable()(include/asm/smp.h),它保护你免受软中断/任务队列/底半部分的影响。

spin_lock_irqsave()(include/linux/spinlock.h)是一个变体,它保存中断是否打开或关闭在一个标志字中,这个标志字传递给 spin_unlock_irqrestore()。这意味着相同的代码可以在硬中断处理程序中使用(其中中断已经关闭),也可以在软中断中使用(其中需要禁用中断)。

请注意,软中断(因此任务队列和定时器)是从硬件中断返回时运行的,因此 spin_lock_irq() 也会停止这些。从这个意义上说,spin_lock_irqsave() 是最通用和最强大的锁定函数。

在两个硬中断处理程序之间进行锁定

很少需要在两个中断处理程序之间共享数据,但如果你确实需要,应该使用 spin_lock_irqsave():具体是否在中断处理程序本身内禁用所有中断取决于体系结构。

锁定的速查表

Pete Zaitcev 给出了以下总结:

  • 如果你在进程上下文(任何系统调用)中,并希望锁定其他进程,使用互斥锁。你可以获取互斥锁并休眠(copy_from_user() 或 kmalloc(x, GFP_KERNEL))。
  • 否则(== 数据可以在中断中访问),使用 spin_lock_irqsave() 和 spin_unlock_irqrestore()。
  • 避免持有自旋锁超过 5 行代码并跨越任何函数调用(除了像 readb() 这样的访问器)。

最低要求表

以下表格列出了不同上下文之间的最低锁定要求。在某些情况下,相同上下文一次只能在一个 CPU 上运行,因此该上下文不需要锁定(例如,特定线程一次只能在一个 CPU 上运行,但如果它需要与另一个线程共享数据,则需要锁定)。

请记住上面的建议:你总是可以使用 spin_lock_irqsave(),它是所有其他自旋锁原语的超集。

. IRQ 处理程序 A IRQ 处理程序 B 软中断 A 软中断 B 任务队列 A 任务队列 B 定时器 A 定时器 B 用户上下文 A 用户上下文 B
IRQ 处理程序 A
IRQ 处理程序 B SLIS
软中断 A SLI SLI SL
软中断 B SLI SLI SL SL
任务队列 A SLI SLI SL SL
任务队列 B SLI SLI SL SL SL
定时器 A SLI SLI SL SL SL SL
定时器 B SLI SLI SL SL SL SL SL
用户上下文 A SLI SLI SLBH SLBH SLBH SLBH SLBH SLBH
用户上下文 B SLI SLI SLBH SLBH SLBH SLBH SLBH SLBH MLI

表格: 锁定要求表

SLIS spin_lock_irqsave
SLI spin_lock_irq
SL spin_lock
SLBH spin_lock_bh
MLI mutex_lock_interruptible

表格:锁定要求表图例

锁的尝试获取函数

有一些函数尝试仅一次获取锁并立即返回一个值,告知是否成功获取锁。如果在其他线程持有锁时不需要访问受锁保护的数据,可以使用这些函数。如果之后需要访问受锁保护的数据,则稍后应获取锁。

spin_trylock() 不会自旋,而是在第一次尝试时获取自旋锁时返回非零值,如果未获取则返回 0。这个函数可以在所有上下文中使用,就像 spin_lock() 一样:必须禁用可能中断你并获取自旋锁的上下文。

mutex_trylock() 不会挂起你的任务,而是在第一次尝试时获取互斥锁时返回非零值,如果未获取则返回 0。尽管不会休眠,但这个函数在硬件或软件中断上下文中不能安全使用。

常见示例

让我们通过一个简单的示例来了解:一个数字到名称映射的缓存。该缓存会记录每个对象被使用的次数,当缓存满时,会删除使用次数最少的对象。

全在用户上下文中

对于我们的第一个示例,我们假设所有操作都在用户上下文中(即来自系统调用),所以我们可以休眠。这意味着我们可以使用互斥锁来保护缓存和其中的所有对象。以下是代码:

#include <linux/list.h>
#include <linux/slab.h>
#include <linux/string.h>
#include <linux/mutex.h>
#include <asm/errno.h>

struct object
{
        struct list_head list;
        int id;
        char name[32];
        int popularity;
};

/* 保护缓存、cache_num 和其中的对象 */
static DEFINE_MUTEX(cache_lock);
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10

/* 必须持有 cache_lock */
static struct object *__cache_find(int id)
{
        struct object *i;

        list_for_each_entry(i, &cache, list)
                if (i->id == id) {
                        i->popularity++;
                        return i;
                }
        return NULL;
}

/* 必须持有 cache_lock */
static void __cache_delete(struct object *obj)
{
        BUG_ON(!obj);
        list_del(&obj->list);
        kfree(obj);
        cache_num--;
}

/* 必须持有 cache_lock */
static void __cache_add(struct object *obj)
{
        list_add(&obj->list, &cache);
        if (++cache_num > MAX_CACHE_SIZE) {
                struct object *i, *outcast = NULL;
                list_for_each_entry(i, &cache, list) {
                        if (!outcast || i->popularity < outcast->popularity)
                                outcast = i;
                }
                __cache_delete(outcast);
        }
}

int cache_add(int id, const char *name)
{
        struct object *obj;

        if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
                return -ENOMEM;

        strscpy(obj->name, name, sizeof(obj->name));
        obj->id = id;
        obj->popularity = 0;

        mutex_lock(&cache_lock);
        __cache_add(obj);
        mutex_unlock(&cache_lock);
        return 0;
}

void cache_delete(int id)
{
        mutex_lock(&cache_lock);
        __cache_delete(__cache_find(id));
        mutex_unlock(&cache_lock);
}

int cache_find(int id, char *name)
{
        struct object *obj;
        int ret = -ENOENT;

        mutex_lock(&cache_lock);
        obj = __cache_find(id);
        if (obj) {
                ret = 0;
                strcpy(name, obj->name);
        }
        mutex_unlock(&cache_lock);
        return ret;
}

请注意,我们始终确保在添加、删除或查找缓存时持有 cache_lock:缓存基础设施本身以及对象的内容都受锁保护。在这种情况下很容易,因为我们为用户复制数据,从不让他们直接访问对象。

这里有一个轻微的(也很常见的)优化:在 cache_add() 中,在获取锁之前设置对象的字段。这是安全的,因为在将其放入缓存之前,没有人可以访问它。

从中断上下文中访问

现在考虑 cache_find() 可能会从中断上下文中调用的情况:无论是硬件中断还是软中断。一个示例是一个定时器,它会从缓存中删除对象。

以下是更改的标准补丁格式:- 行表示被删除的行,+ 行表示被添加的行。

--- cache.c.usercontext 2003-12-09 13:58:54.000000000 +1100
+++ cache.c.interrupt   2003-12-09 14:07:49.000000000 +1100
@@ -12,7 +12,7 @@
         int popularity;
 };

-static DEFINE_MUTEX(cache_lock);
+static DEFINE_SPINLOCK(cache_lock);
 static LIST_HEAD(cache);
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10
@@ -55,6 +55,7 @@
 int cache_add(int id, const char *name)
 {
         struct object *obj;
+        unsigned long flags;

         if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
                 return -ENOMEM;
@@ -63,30 +64,33 @@
         obj->id = id;
         obj->popularity = 0;

-        mutex_lock(&cache_lock);
+        spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
         return 0;
 }

 void cache_delete(int id)
 {
-        mutex_lock(&cache_lock);
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
         __cache_delete(__cache_find(id));
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
 }

 int cache_find(int id, char *name)
 {
         struct object *obj;
         int ret = -ENOENT;
+        unsigned long flags;

-        mutex_lock(&cache_lock);
+        spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
         if (obj) {
                 ret = 0;
                 strcpy(name, obj->name);
         }
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
         return ret;
 }

请注意,spin_lock_irqsave() 会在需要时关闭中断,否则不执行任何操作(如果我们已经在中断处理程序中,则不执行任何操作),因此这些函数可以安全地从任何上下文中调用。

不幸的是,cache_add() 使用了 GFP_KERNEL 标志调用 kmalloc(),这只在用户上下文中合法。我假设 cache_add() 仍然只在用户上下文中调用,否则这应该成为 cache_add() 的一个参数。

将对象暴露给文件外部

如果我们的对象包含更多信息,仅复制信息进出可能不足够:代码的其他部分可能希望保留对这些对象的指针,而不是每次都查找 id。这会产生两个问题。

第一个问题是我们使用 cache_lock 来保护对象:我们需要将其设置为非静态,以便代码的其他部分可以使用它。这使得锁定变得更加棘手,因为它不再集中在一个地方。

第二个问题是生存期问题:如果另一个结构保留了对对象的指针,那么它可能期望该指针保持有效。不幸的是,只有在持有锁时才能保证这一点,否则可能会有人调用 cache_delete(),甚至更糟的是,添加另一个对象,重用相同的地址。

由于只有一个锁,你不能永远持有它:否则其他人将无法完成任何工作。

解决这个问题的方法是使用引用计数:每个持有对象指针的人在首次获取对象时增加引用计数,并在完成后减少引用计数。将其减少到零的人知道它未被使用,并且可以实际删除它。

以下是代码:

--- cache.c.interrupt   2003-12-09 14:25:43.000000000 +1100
+++ cache.c.refcnt  2003-12-09 14:33:05.000000000 +1100
@@ -7,6 +7,7 @@
 struct object
 {
         struct list_head list;
+        unsigned int refcnt;
         int id;
         char name[32];
         int popularity;
@@ -17,6 +18,35 @@
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10

+static void __object_put(struct object *obj)
+{
+        if (--obj->refcnt == 0)
+                kfree(obj);
+}
+
+static void __object_get(struct object *obj)
+{
+        obj->refcnt++;
+}
+
+void object_put(struct object *obj)
+{
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
+        __object_put(obj);
+        spin_unlock_irqrestore(&cache_lock, flags);
+}
+
+void object_get(struct object *obj)
+{
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
+        __object_get(obj);
+        spin_unlock_irqrestore(&cache_lock, flags);
+}
+
 /* 必须持有 cache_lock */
 static struct object *__cache_find(int id)
 {
@@ -35,6 +65,7 @@
 {
         BUG_ON(!obj);
         list_del(&obj->list);
+        __object_put(obj);
         cache_num--;
 }

@@ -63,6 +94,7 @@
         strscpy(obj->name, name, sizeof(obj->name));
         obj->id = id;
         obj->popularity = 0;
+        obj->refcnt = 1; /* 缓存持有一个引用 */

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
@@ -79,18 +111,15 @@
         spin_unlock_irqrestore(&cache_lock, flags);
 }

-int cache_find(int id, char *name)
+struct object *cache_find(int id)
 {
         struct object *obj;
-        int ret = -ENOENT;
         unsigned long flags;

         spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
-        if (obj) {
-                ret = 0;
-                strcpy(name, obj->name);
-        }
+        if (obj)
+                __object_get(obj);
         spin_unlock_irqrestore(&cache_lock, flags);
-        return ret;
+        return obj;
 }

我们在标准的“获取”和“释放”函数中封装了引用计数。现在我们可以从 cache_find() 中返回对象本身,这样用户就可以在持有对象时休眠(例如,将名称复制到用户空间的 copy_to_user())。

另一个要注意的地方是,我说每个指向对象的指针都应持有一个引用:因此,当首次插入缓存时,引用计数为 1。在某些版本中,框架不持有引用计数,但它们更复杂。

使用原子操作进行引用计数

在实践中,通常会使用atomic_t来表示引用计数。在include/asm/atomic.h中定义了许多原子操作:这些操作保证在系统中的所有 CPU 中都是原子的,因此不需要使用锁。在这种情况下,使用自旋锁比使用普通锁更简单,尽管对于任何非平凡的情况,使用自旋锁更清晰。使用atomic_inc()atomic_dec_and_test()来代替标准的递增和递减操作,不再需要使用锁来保护引用计数本身。

--- cache.c.refcnt  2003-12-09 15:00:35.000000000 +1100
+++ cache.c.refcnt-atomic   2003-12-11 15:49:42.000000000 +1100
@@ -7,7 +7,7 @@
 struct object
 {
         struct list_head list;
-        unsigned int refcnt;
+        atomic_t refcnt;
         int id;
         char name[32];
         int popularity;
@@ -18,33 +18,15 @@
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10

-static void __object_put(struct object *obj)
-{
-        if (--obj->refcnt == 0)
-                kfree(obj);
-}
-
-static void __object_get(struct object *obj)
-{
-        obj->refcnt++;
-}
-
 void object_put(struct object *obj)
 {
-        unsigned long flags;
-
-        spin_lock_irqsave(&cache_lock, flags);
-        __object_put(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        if (atomic_dec_and_test(&obj->refcnt))
+                kfree(obj);
 }

 void object_get(struct object *obj)
 {
-        unsigned long flags;
-
-        spin_lock_irqsave(&cache_lock, flags);
-        __object_get(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        atomic_inc(&obj->refcnt);
 }

保护对象本身

在这些示例中,我们假设对象(除了引用计数之外)一旦创建就不会改变。如果我们希望允许名称更改,有三种可能性:

  • 您可以使cache_lock非静态,并告诉用户在更改任何对象的名称之前抓取该锁。
  • 您可以提供cache_obj_rename()函数,该函数获取此锁并为调用者更改名称,并告诉所有人使用该函数。
  • 您可以使cache_lock仅保护缓存本身,并使用另一个锁来保护名称。

理论上,您可以使锁尽可能细粒度,即每个字段、每个对象一个锁。在实践中,最常见的变体是:

  • 一个锁用于保护基础设施(例如此示例中的缓存列表)和所有对象。这是我们迄今为止所做的。
  • 一个锁用于保护基础设施(包括对象内部的列表指针),以及对象内部的另一个锁用于保护该对象的其余部分。
  • 多个锁用于保护基础设施(例如每个哈希链一个锁),可能还有一个单独的每个对象的锁。

以下是“每个对象一个锁”的实现:

--- cache.c.refcnt-atomic   2003-12-11 15:50:54.000000000 +1100
+++ cache.c.perobjectlock   2003-12-11 17:15:03.000000000 +1100
@@ -6,11 +6,17 @@

 struct object
 {
+        /* 这两个由cache_lock保护。 */
         struct list_head list;
+        int popularity;
+
         atomic_t refcnt;
+
+        /* 一旦创建就不会改变。 */
         int id;
+
+        spinlock_t lock; /* 保护名称 */
         char name[32];
-        int popularity;
 };

 static DEFINE_SPINLOCK(cache_lock);
@@ -77,6 +84,7 @@
         obj->id = id;
         obj->popularity = 0;
         atomic_set(&obj->refcnt, 1); /* 缓存持有一个引用 */
+        spin_lock_init(&obj->lock);

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);

我决定将popularity计数由cache_lock保护,而不是由每个对象的锁保护:这是因为它(就像对象内部的struct list_head一样)在逻辑上是基础设施的一部分。这样,当在__cache_add()中寻找最不受欢迎的对象时,我不需要获取每个对象的锁。

我还决定id成员是不可更改的,因此我不需要在__cache_find()中获取每个对象的锁来检查id:对象锁仅由想要读取或写入名称字段的调用者使用。

还要注意,我添加了一条注释,描述了哪些锁保护了哪些数据。这非常重要,因为它描述了代码的运行时行为,而仅仅阅读代码可能很难获得这些信息。正如Alan Cox所说,“锁定数据,而不是代码”。

常见问题

死锁:简单和高级

存在一种编码错误,即代码尝试两次获取自旋锁:它将永远自旋,等待锁被释放(在 Linux 中,自旋锁、读写锁和互斥锁都不支持递归)。这种问题很容易诊断:不是那种需要熬夜和毛绒兔子交谈的问题。

对于稍微复杂一些的情况,想象一下有一个软中断和用户上下文共享的区域。如果使用spin_lock()调用来保护它,可能会出现用户上下文在持有锁时被软中断中断,然后软中断会永远自旋,尝试获取相同的锁。

这两种情况都称为死锁,正如上文所示,即使在单个 CPU 上也可能发生(尽管在 UP 编译上不会发生,因为在配置为CONFIG_SMP=n的内核编译中,自旋锁会消失。在第二个示例中仍会发生数据损坏)。

这种完全的死锁很容易诊断:在 SMP 系统上,看门狗定时器或使用DEBUG_SPINLOCK设置(include/linux/spinlock.h)将立即显示出问题。

更复杂的问题是所谓的“致命拥抱”,涉及两个或更多个锁。假设有一个哈希表:表中的每个条目都是一个自旋锁和一个散列对象链。在软中断处理程序中,有时您希望将对象从一个哈希链移到另一个哈希链:您会获取旧哈希链的自旋锁和新哈希链的自旋锁,并从旧链中删除对象,然后将其插入新链。

这里有两个问题。首先,如果您的代码曾试图将对象移动到相同的链,它将因尝试两次锁定而发生死锁。其次,如果另一个 CPU 上的相同软中断尝试以相反的方向移动另一个对象,可能会发生以下情况:

CPU 1 CPU 2
获取锁 A -> OK 获取锁 B -> OK
获取锁 B -> 自旋 获取锁 A -> 自旋

表:后果

两个 CPU 将永远自旋,等待另一个放弃其锁。这看起来、闻起来、感觉起来都像崩溃。

预防死锁

教科书会告诉您,如果总是以相同的顺序锁定,就永远不会出现这种死锁。实践会告诉您,这种方法不可扩展:当我创建一个新的锁时,我不了解足够的内核知识来确定它将适合 5000 个锁层次结构的哪个位置。

最好的锁是封装的:它们永远不会暴露在头文件中,并且永远不会在调用非平凡函数的情况下持有。您可以阅读此代码并看到它永远不会死锁,因为它永远不会在持有该锁的同时尝试获取另一个锁。使用您的代码的人甚至不需要知道您正在使用锁。

这里的一个经典问题是当您提供回调或挂钩时:如果在持有锁的情况下调用它们,您会面临简单的死锁或致命拥抱的风险(谁知道回调会做什么?)。

过度防止死锁

死锁问题很麻烦,但不如数据损坏那么糟糕。代码获取读锁,搜索列表,未找到所需内容,释放读锁,获取写锁并插入对象存在竞争条件。

竞争定时器:内核的消遣

定时器可能会产生它们自己特殊的竞争问题。考虑一个对象集合(列表、哈希等),其中每个对象都有一个定时器,该定时器到期时将销毁它。

如果您想销毁整个集合(例如在模块移除时),您可能会这样做:

/* 这段代码糟糕糟糕糟糕:如果再糟糕一点,它就会使用匈牙利命名法 */
spin_lock_bh(&list_lock);

while (list) {
        struct foo *next = list->next;
        timer_delete(&list->timer);
        kfree(list);
        list = next;
}

spin_unlock_bh(&list_lock);

在 SMP 系统上,迟早会崩溃,因为定时器可能在spin_lock_bh()之前就已经触发,它只会在我们spin_unlock_bh()之后获取锁,然后尝试释放已经被释放的元素(已经被释放)。

这可以通过检查timer_delete()的结果来避免:如果返回 1,则定时器已被删除。如果返回 0,则表示(在这种情况下)它当前正在运行,因此我们可以这样做:

retry:
        spin_lock_bh(&list_lock);

        while (list) {
                struct foo *next = list->next;
                if (!timer_delete(&list->timer)) {
                        /* 让定时器有机会删除这个 */
                        spin_unlock_bh(&list_lock);
                        goto retry;
                }
                kfree(list);
                list = next;
        }

        spin_unlock_bh(&list_lock);

另一个常见问题是删除重新启动自己的定时器(通过在其定时器函数末尾调用add_timer())。由于这是一个容易发生竞争的常见情况,您应该使用timer_delete_sync()include/linux/timer.h)来处理这种情况。

在释放定时器之前,应调用timer_shutdown()timer_shutdown_sync(),这将阻止其重新启动。核心代码将忽略任何后续尝试重新启动定时器。

锁定速度

在考虑代码锁定速度时,有三个主要问题需要考虑。首先是并发性:在持有锁的同时会有多少事物在等待。其次是实际获取和释放无争用锁所需的时间。第三是使用更少或更智能的锁。我假设锁定会经常使用:否则,你就不会关心效率。

并发性取决于锁通常持有的时间:你应该只在需要的时候持有锁,而不是更长时间。在缓存示例中,我们总是在不持有锁的情况下创建对象,然后只有在准备将其插入列表时才获取锁。

获取时间取决于锁操作对流水线的损害程度(流水线停顿),以及这个 CPU 是否是最后一个获取锁的(即该 CPU 的锁是否在缓存中):在拥有更多 CPU 的机器上,这种可能性会迅速下降。考虑一个 700MHz 的英特尔 Pentium III:一条指令大约需要 0.7 纳秒,原子递增大约需要 58 纳秒,一个在该 CPU 上缓存热的锁需要 160 纳秒,从另一个 CPU 传输缓存行需要额外的 170 到 360 纳秒(这些数据来自 Paul McKenney 的 Linux Journal RCU 文章)。

这两个目标是相互冲突的:持有锁的时间很短可能会通过将锁分成部分(比如在我们最终的每个对象锁示例中)来实现,但这会增加锁获取的次数,结果通常比只有一个锁要慢。这也是提倡锁定简单性的另一个原因。

第三个问题在下面进行了解释:有一些方法可以减少需要执行的锁定量。

读/写锁变体

自旋锁和互斥锁都有读/写变体:rwlock_t 和 struct rw_semaphore。这些将用户分为两类:读者和写者。如果你只是读取数据,你可以获取读锁,但如果要写入数据,则需要写锁。许多人可以持有读锁,但写者必须是唯一的持有者。

如果你的代码可以很好地按照读者/写者的方式划分(就像我们的缓存代码一样),并且读者长时间持有锁,那么使用这些锁可以有所帮助。不过,它们略慢于普通锁,因此在实践中,rwlock_t 通常不值得使用。

避免锁定:读-拷贝-更新

有一种特殊的读/写锁定方法称为读-拷贝-更新。使用 RCU,读者可以完全避免获取锁:因为我们期望缓存被读取的次数比更新的次数多(否则缓存就是浪费时间),所以它是这种优化的候选。

我们如何摆脱读锁?摆脱读锁意味着写者可能在读者下面更改列表。这实际上非常简单:如果写者非常小心地添加元素,我们可以在写者添加元素时读取链接列表。例如,在单链表 list 中添加新元素:

new->next = list->next;
wmb();
list->next = new;

wmb() 是写内存屏障。它确保第一个操作(设置新元素的下一个指针)完成并被所有 CPU 看到,然后才执行第二个操作(将新元素放入列表)。这很重要,因为现代编译器和现代 CPU 都可以重新排序指令,除非另有说明:我们希望读者要么根本不看到新元素,要么看到下一个指针正确指向列表的其余部分。

幸运的是,有一个函数可以为标准的 struct list_head 列表执行此操作:list_add_rcu()(include/linux/list.h)。

从列表中删除元素甚至更简单:我们用其后继者的指针替换旧元素的指针,读者要么看到它,要么跳过它。

list->next = old->next;

有 list_del_rcu()(include/linux/list.h)来执行此操作(正常版本会使旧对象无效,这不是我们想要的)。

读者也必须小心:一些 CPU 可能会提前查看下一个指针以开始读取下一个元素的内容,但不意识到在其下面更改下一个指针时,预取的内容是错误的。再次,有 list_for_each_entry_rcu()(include/linux/list.h)来帮助你。当然,写者可以使用 list_for_each_entry(),因为不可能有两个同时的写者。

我们最后的困境是:我们何时才能实际销毁已删除的元素?记住,读者可能正在遍历列表中的这个元素:如果我们释放了这个元素并且下一个指针发生了变化,读者将跳转到垃圾中并崩溃。我们需要等到知道所有在删除元素时正在遍历列表的读者都完成了。我们使用 call_rcu() 来注册一个回调,一旦所有现有的读者都完成,它就会实际销毁对象。或者,可以使用 synchronize_rcu() 来阻塞,直到所有现有的读者都完成。

但是,读-拷贝-更新如何知道读者何时完成?方法是:首先,读者总是在 rcu_read_lock()/rcu_read_unlock() 对中遍历列表:这些简单地禁用抢占,因此读者在读取列表时不会进入睡眠状态。

然后,RCU 等待直到每个其他 CPU 至少睡眠一次:因为读者不能睡眠,所以我们知道在删除期间遍历列表的任何读者都已经完成,然后触发回调。真正的读-拷贝-更新代码比这更加优化,但这是基本思想。

--- cache.c.perobjectlock   2003-12-11 17:15:03.000000000 +1100
+++ cache.c.rcupdate    2003-12-11 17:55:14.000000000 +1100
@@ -1,15 +1,18 @@
 #include <linux/list.h>
 #include <linux/slab.h>
 #include <linux/string.h>
+#include <linux/rcupdate.h>
 #include <linux/mutex.h>
 #include <asm/errno.h>

 struct object
 {
-        /* These two protected by cache_lock. */
+        /* This is protected by RCU */
         struct list_head list;
         int popularity;

+        struct rcu_head rcu;
+
         atomic_t refcnt;

         /* Doesn't change once created. */
@@ -40,7 +43,7 @@
 {
         struct object *i;

-        list_for_each_entry(i, &cache, list) {
+        list_for_each_entry_rcu(i, &cache, list) {
                 if (i->id == id) {
                         i->popularity++;
                         return i;
@@ -49,19 +52,25 @@
         return NULL;
 }

+/* Final discard done once we know no readers are looking. */
+static void cache_delete_rcu(void *arg)
+{
+        object_put(arg);
+}
+
 /* Must be holding cache_lock */
 static void __cache_delete(struct object *obj)
 {
         BUG_ON(!obj);
-        list_del(&obj->list);
-        object_put(obj);
+        list_del_rcu(&obj->list);
         cache_num--;
+        call_rcu(&obj->rcu, cache_delete_rcu);
 }

 /* Must be holding cache_lock */
 static void __cache_add(struct object *obj)
 {
-        list_add(&obj->list, &cache);
+        list_add_rcu(&obj->list, &cache);
         if (++cache_num > MAX_CACHE_SIZE) {
                 struct object *i, *outcast = NULL;
                 list_for_each_entry(i, &cache, list) {
@@ -104,12 +114,11 @@
 struct object *cache_find(int id)
 {
         struct object *obj;
-        unsigned long flags;

-        spin_lock_irqsave(&cache_lock, flags);
+        rcu_read_lock();
         obj = __cache_find(id);
         if (obj)
                 object_get(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        rcu_read_unlock();
         return obj;
 }

注意,读者将在 __cache_find() 中更改 popularity 成员,现在它不持有锁。一个解决方案是将其设置为 atomic_t,但对于这种用法,我们并不真的关心竞争:一个近似的结果已经足够好了,所以我没有更改它。

结果是 cache_find() 不需要与任何其他函数同步,因此在 SMP 上几乎与在 UP 上一样快。

这里还有进一步的优化可能:还记得我们最初的缓存代码吗,其中没有引用计数,调用者只需在使用对象时持有锁吗?这仍然是可能的:如果你持有锁,没有人可以删除对象,因此你不需要获取和释放引用计数。

现在,因为 RCU 中的“读锁”只是禁用了抢占,所以在调用 cache_find() 和 object_put() 之间始终禁用抢占的调用者实际上不需要获取和释放引用计数:我们可以通过使其非静态来公开 __cache_find(),这样的调用者可以直接调用它。

这里的好处是引用计数没有被写入:对象没有被以任何方式更改,这在 SMP 机器上由于缓存而更快。

每 CPU 数据

避免锁定的另一种技术是为每个 CPU 复制信息。例如,如果你想保持一个常见条件的计数,你可以使用自旋锁和单个计数器。非常简单。

如果这太慢了(通常不会,但如果你有一个非常大的机器进行测试,并且可以证明它是慢的),你可以为每个 CPU 使用一个计数器,然后它们中的任何一个都不需要独占锁。参见 DEFINE_PER_CPU()、get_cpu_var() 和 put_cpu_var()(include/linux/percpu.h)。

对于简单的每 CPU 计数器,特别有用的是 local_t 类型,以及 cpu_local_inc() 和相关函数,它们在某些架构上比简单的代码更有效(include/asm/local.h)。

请注意,没有简单、可靠的方法可以在不引入更多锁的情况下获取这种计数器的确切值。对于某些用途,这不是问题。

大多数由 IRQ 处理程序使用的数据

如果数据总是从相同的 IRQ 处理程序中访问,你根本不需要锁:内核已经保证 IRQ 处理程序不会同时在多个 CPU 上运行。

Manfred Spraul 指出,即使数据在用户上下文或软中断/任务let 中非常偶尔地访问,你仍然可以这样做。IRQ 处理程序不使用锁,所有其他访问都是这样完成的:

mutex_lock(&lock);
disable_irq(irq);
...
enable_irq(irq);
mutex_unlock(&lock);

disable_irq() 防止 IRQ 处理程序运行(如果它当前在其他 CPU 上运行,则等待其完成)。自旋锁防止同时发生任何其他访问。当然,这比只调用 spin_lock_irq() 要慢,因此只有在这种类型的访问极其罕见时才有意义。

哪些函数可以安全地从中断中调用?

内核中的许多函数会直接或间接地休眠(即调用 schedule()):你永远不能在持有自旋锁时或禁用抢占时调用它们。这也意味着你需要在用户上下文中:从中断中调用它们是非法的。

一些会休眠的函数

最常见的列在下面,但通常你必须阅读代码才能找出其他调用是否安全。如果调用它的其他人可以休眠,那么你可能也需要能够休眠。特别是,注册和注销函数通常期望从用户上下文中调用,并且可能会休眠。

  • 访问用户空间:

    • copy_from_user()
    • copy_to_user()
    • get_user()
    • put_user()
  • kmalloc(GFP_KERNEL)

  • mutex_lock_interruptible() 和 mutex_lock()
    有一个不会休眠的 mutex_trylock()。不过,它也不能在中断上下文中使用,因为它的实现对此不安全。mutex_unlock() 也永远不会休眠。它也不能在中断上下文中使用,因为必须由获取它的任务释放。

一些不会休眠的函数

一些函数可以从任何上下文中调用,或者持有几乎任何锁。

  • printk()
  • kfree()
  • add_timer() 和 timer_delete()

Mutex API reference

https://www.kernel.org/doc/html/v6.6/kernel-hacking/locking.html#mutex-api-reference

以下是您提供的内容的中文翻译:

进一步阅读

  • Documentation/locking/spinlocks.rst:Linus Torvalds在内核源代码中的自旋锁教程。

  • Unix Systems for Modern Architectures:对称多处理和内核程序员的缓存:Curt Schimmel对内核级别锁定的非常好的介绍(虽然不是为Linux编写的,但几乎所有内容都适用)。这本书很贵,但绝对值得每一分钱,以了解SMP锁定。[ISBN:0201633388]

致谢

感谢Telsa Gwynne提供的DocBooking,整理和添加样式。

感谢Martin Pool,Philipp Rumpf,Stephen Rothwell,Paul Mackerras,Ruedi Aschwanden,Alan Cox,Manfred Spraul,Tim Waugh,Pete Zaitcev,James Morris,Robert Love,Paul McKenney,John Ashby进行校对、纠正、批评。

感谢秘密组织对本文档没有任何影响。

术语表

  • preemption
    在2.5版本之前,或者当CONFIG_PREEMPT未设置时,内核中的用户上下文进程不会相互抢占(即,你拥有该CPU,直到你放弃它,除了中断)。随着2.5.4中CONFIG_PREEMPT的添加,这种情况发生了变化:在用户上下文中,优先级更高的任务可以“插入”:自旋锁被更改为禁用抢占,即使在UP上也是如此。

  • bh
    Bottom Half:出于历史原因,现在带有“_bh”的函数通常指任何软件中断,例如spin_lock_bh()会阻止当前CPU上的任何软件中断。底部中断已被弃用,并最终将被任务let取代。任何时候只会运行一个底部中断。

  • Hardware Interrupt / Hardware IRQ
    硬件中断请求。in_hardirq()在硬件中断处理程序中返回true。

  • Interrupt Context
    非用户上下文:处理硬件中断请求或软件中断请求。通过in_interrupt()宏返回true来表示。

  • SMP
    对称多处理器:为多CPU机器编译的内核。(CONFIG_SMP=y)。

  • Software Interrupt / softirq
    软件中断处理程序。in_hardirq()返回false;in_softirq()返回true。任务let和softirq都属于“软件中断”的范畴。

    严格来说,softirq是多达32个枚举的软件中断之一,可以同时在多个CPU上运行。有时也用来指代任务let(即所有软件中断)。

  • tasklet
    动态可注册的软件中断,保证一次只在一个CPU上运行。

  • timer
    动态可注册的软件中断,在给定时间(或接近给定时间)运行。运行时,它就像一个任务let(实际上,它们是从TIMER_SOFTIRQ调用的)。

  • UP
    单处理器:非SMP。(CONFIG_SMP=n)。

  • User Context
    代表特定进程(即系统调用或陷阱)或内核线程执行的内核。您可以使用current宏来确定当前进程。不要与用户空间混淆。可以被软件或硬件中断打断。

  • Userspace
    在内核外执行自己代码的进程。