sync.Cond的使用和实现原理

发布时间 2024-01-11 00:22:45作者: 李若盛开

一、概念

Go标准库提供了Cond原语,sync.Cond(条件变量)是一个用于在多个goroutine之间进行同步和通信的重要工具,可以让 Goroutine 在满足特定条件时被阻塞和唤醒。

条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的协程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。

sync.Cond 基于互斥锁/读写锁,那它和互斥锁有什么区别呢?
互斥锁 sync.Mutex 通常用来保护共享的临界资源,条件变量 sync.Cond 用来协调想要访问共享资源的 Goroutine。当共享资源的状态发生变化时,sync.Cond 可以用来通知被阻塞的 Goroutine。

二、底层数据结构

type Cond struct {
    noCopy noCopy
 
    // L is held while observing or changing the condition
    L Locker
 
    notify  notifyList
    checker copyChecker
}
 
type notifyList struct {
    wait   uint32
    notify uint32
    lock   uintptr // key field of the mutex
    head   unsafe.Pointer
    tail   unsafe.Pointer
}

主要有4个字段:

nocopy :golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet 检查程序时,就会发现有报错,但需要注意的是,noCopy 不会影响程序正常的编译和运行

checker:用于禁止运行期间发生拷贝,双重检查(Double check)

L:可以传入一个读写锁或互斥锁,当修改条件或者调用Wait方法时需要加锁

notify:通知链表,调用Wait()方法的Goroutine会放到这个链表中,从这里获取需被唤醒的Goroutine列表

三、原理

Sync.Cond存在一个通知队列,保存了所有处于等待状态的协程。通知队列定义如下:

type notifyList struct {
   wait   uint32
   notify uint32
   lock   uintptr // key field of the mutex
   head   unsafe.Pointer
   tail   unsafe.Pointer
}

当调用Wait方法时,此时Wait方法会释放所持有的锁,然后将自己放到notifyList等待队列中等待。此时会将当前协程加入到等待队列的尾部,然后进入阻塞状态。

当调用Signal 时,此时会唤醒等待队列中的第一个协程,其他继续等待。如果此时没有处于等待状态的协程,调用Signal不会有其他作用,直接返回。当调用BoradCast方法时,则会唤醒notfiyList中所有处于等待状态的协程。sync.Cond的代码实现比较简单,协程的唤醒和阻塞已经由运行时包实现了,sync.Cond的实现直接调用了运行时包提供的API。

四、使用方法

当使用sync.Cond时,通常需要以下几个步骤:

1)定义一个互斥锁,用于保护共享数据;
2)创建一个sync.Cond对象,关联这个互斥锁;
3)在需要等待条件变量的地方,获取这个互斥锁,并使用Wait方法等待条件变量被通知;
4)在需要通知等待的协程时,使用Signal或Broadcast方法通知等待的协程。
5)最后,释放这个互斥锁。

在Cond里主要有3个方法:

sync.NewCond(l Locker): 新建一个 sync.Cond 变量,注意该函数需要一个 Locker 作为必填参数,这是因为在 cond.Wait() 中底层会涉及到 Locker 的锁操作

Cond.Wait(): 阻塞等待被唤醒,调用Wait函数前需要先加锁;并且由于Wait函数被唤醒时存在虚假唤醒等情况,导致唤醒后发现,条件依旧不成立,因此需要使用 for 语句来循环地进行等待,直到条件成立为止

Cond.Signal(): 只唤醒一个最先 Wait 的 goroutine,可以不用加锁

Cond.Broadcast(): 唤醒所有Wait的goroutine,可以不用加锁

示例:

package main
 
import (
 "fmt"
 "sync"
 "time"
)
 
func main() {
 var mu sync.Mutex
 cond := sync.NewCond(&mu)
 done := make(chan bool)
 
 // 启动一个goroutine等待条件
 go func() {
  mu.Lock()
  defer mu.Unlock()
  fmt.Println("Waiting for condition...")
  cond.Wait() // 等待条件变量被唤醒
  // Wait 内部会先调用 c.L.Unlock(),来先释放锁,如果调用方不先加锁的话,会报错
  fmt.Println("Condition received!")
  done <- true
 }()
 
 // 模拟一些耗时的工作
 time.Sleep(2 * time.Second)
 
 // 在主goroutine中发送Signal信号,唤醒等待的goroutine
 fmt.Println("Sending signal...")
 cond.Signal() // 唤醒一个等待的goroutine
 
 // 等待goroutine完成
 <-done
 fmt.Println("Done")
}
View Code

输出:

Waiting for condition...
Sending signal...
Condition received!
Done

创建了一个互斥锁 mu 和一个 sync.Cond 变量 cond,然后启动一个goroutine等待条件变量被唤醒,并在主goroutine中模拟一些耗时的工作后,通过 cond.Signal() 方法发送信号,唤醒等待的goroutine。一旦条件变量被唤醒,等待的goroutine会继续执行。当运行这个示例时,会看到输出中的等待和唤醒消息,以及最后的"Done"表示成功完成。

请注意,在使用Signal方法时,通常需要在互斥锁的保护下调用,以确保对条件变量的访问是线程安全的。

五、使用注意事项

(1)sync.Cond 不能被复制

主要原因是 sync.Cond 内部是维护着一个 Goroutine 通知队列 notifyList。如果这个队列被复制的话,那么在并发场景下导致不同 Goroutine 之间操作的 notifyList.wait、notifyList.notify 并不是同一个,这会导致出现有些 Goroutine 会一直阻塞。

(2)唤醒顺序

从等待队列中按照顺序唤醒,先进入等待队列,先被唤醒。

(3)调用Wait方法前未加锁

如果在调用Wait方法前未加锁,此时会直接panic。(调用 Wait() 函数前,需要先获得条件变量的成员锁,原因是需要互斥地变更条件变量的等待队列。在 Wait() 返回前,会重新上锁。重新上锁的原因是主调在 Wait 后会进行解锁操作,避免重复解锁引发 panic)

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
   count int
   cond  *sync.Cond
   lk    sync.Mutex
)

func main() {
    cond = sync.NewCond(&lk)
    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
       defer wg.Done()
       for {
          time.Sleep(time.Second)
          count++
          cond.Broadcast()
       }
    }()
    
    go func() {
       defer wg.Done()
       for {
          time.Sleep(time.Millisecond * 500)          
          //cond.L.Lock() 
          for count%10 != 0 {
               cond.Wait()
          }
          t.Logf("count = %d", count)
          //cond.L.Unlock()  
       }
    }()
    wg.Wait()
}
View Code

上面代码中,协程一每隔1s,将count字段的值自增1,然后唤醒所有处于等待状态的协程。协程二执行的条件为count的值为10的倍数,此时满足执行条件,唤醒后将会继续往下执行。

但是这里在调用sync.Wait方法前,没有先获取锁,下面是其执行结果,会抛出 fatal error: sync: unlock of unlocked mutex 错误,结果如下:

count = 0
fatal error: sync: unlock of unlocked mutex

因此,在调用Wait方法前,需要先获取到与sync.Cond关联的锁,否则会直接抛出异常。

强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。

这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。

正常的用法应该是,在调用Wait方法前便加锁,只会有一个协程判断是否满足condition条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。

(4)Wait方法接收到通知后,未重新检查条件变量

调用sync.Wait方法,协程进入阻塞状态后被唤醒,没有重新检查条件变量,此时有可能仍然处于不满足条件变量的场景下。然后直接执行后续操作,有可能会导致程序出错。

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
   count int
   cond  *sync.Cond
   lk    sync.Mutex
)

func main() {
    cond = sync.NewCond(&lk)
    wg := sync.WaitGroup{}
    wg.Add(3)
    go func() {
       defer wg.Done()
       for {
          time.Sleep(time.Second)
          cond.L.Lock()
          // 将flag 设置为true
          flag = true
          // 唤醒所有处于等待状态的协程
          cond.Broadcast()
          cond.L.Unlock()
       }
    }()
    
    for i := 0; i < 2; i++ {
       go func(i int) {
          defer wg.Done()
          for {
             time.Sleep(time.Millisecond * 500)
             cond.L.Lock()
             // 不满足条件,此时进入等待状态
             if !flag {
                cond.Wait()
             }
             // 被唤醒后,此时可能仍然不满足条件
             fmt.Printf("协程 %d flag = %t", i, flag)
             flag = false
             cond.L.Unlock()
          }
       }(i)
    }
    wg.Wait()
}
View Code

启动了一个协程,定时将flag设置为true,相当于每隔一段时间,便满足执行条件,然后唤醒所有处于等待状态的协程。

然后又启动了两个协程,在满足条件的前提下,开始执行后续操作,但是这里协程被唤醒后,没有重新检查条件变量,具体看第39行。这里会出现的场景是,第一个协程被唤醒后,此时执行后续操作,然后将flag重新设置为false,此时已经不满足条件了。之后第二个协程唤醒后,获取到锁,没有重新检查此时是否满足执行条件,直接向下执行,这个就和我们预期不符,可能会导致程序出错,代码执行效果如下:

协程 1 flag = true
协程 0 flag = false
协程 1 flag = true
协程 0 flag = false

可以看到,此时协程0执行时,flag的值均为false,说明此时其实并不符合执行条件,可能会导致程序出错。因此正确用法应该像下面这样子,被唤醒后,需要重新检查条件变量,满足条件之后才能继续向下执行。

c.L.Lock()
// 唤醒后,重新检查条件变量是否满足条件
for !condition() {
    c.Wait()
}
// 满足条件情况下,执行的逻辑
c.L.Unlock()

(5)sync.Cond 和 channel 的区别?

channel 定位于通信,用于一发一收的场景,sync.Cond 定位于同步,用于一发多收的场景。虽然 channel 可以通过 close 操作来达到一发多收的效果,但是 closed 的 channel 已无法继续使用,而 sync.Cond 依旧可以继续使用。这可能就是“全能”与“专精”的区别。

1)作用:互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。

2)使用场景:sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。

六、总结

  Go 语言中的 sync.Cond 并发原语,它是用于实现 goroutine 之间的同步的重要工具。首先学习了 sync.Cond 的基本使用方法,包括创建和使用条件变量、使用Wait和Signal/Broadcast方法等。sync.Cond 的实现原理,主要是对等待队列的使用,从而sync.Cond有更好的理解,能够更好得使用它。同时要使用sync.Cond的注意事项,如调用Wait方法前需要加锁、Wait方法接收到通知后未重新检查条件变量等。