一、概念
Go标准库提供了Cond原语,sync.Cond(条件变量)是一个用于在多个goroutine之间进行同步和通信的重要工具,可以让 Goroutine 在满足特定条件时被阻塞和唤醒。
条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的协程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。
sync.Cond 基于互斥锁/读写锁,那它和互斥锁有什么区别呢?
互斥锁 sync.Mutex 通常用来保护共享的临界资源,条件变量 sync.Cond 用来协调想要访问共享资源的 Goroutine。当共享资源的状态发生变化时,sync.Cond 可以用来通知被阻塞的 Goroutine。
二、底层数据结构
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
主要有4个字段:
nocopy :golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet 检查程序时,就会发现有报错,但需要注意的是,noCopy 不会影响程序正常的编译和运行
checker:用于禁止运行期间发生拷贝,双重检查(Double check)
L:可以传入一个读写锁或互斥锁,当修改条件或者调用Wait方法时需要加锁
notify:通知链表,调用Wait()方法的Goroutine会放到这个链表中,从这里获取需被唤醒的Goroutine列表
三、原理
在Sync.Cond
存在一个通知队列,保存了所有处于等待状态的协程。通知队列定义如下:
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
当调用Wait方法时,此时Wait方法会释放所持有的锁,然后将自己放到notifyList等待队列中等待。此时会将当前协程加入到等待队列的尾部,然后进入阻塞状态。
当调用Signal 时,此时会唤醒等待队列中的第一个协程,其他继续等待。如果此时没有处于等待状态的协程,调用Signal不会有其他作用,直接返回。当调用BoradCast方法时,则会唤醒notfiyList中所有处于等待状态的协程。sync.Cond的代码实现比较简单,协程的唤醒和阻塞已经由运行时包实现了,sync.Cond的实现直接调用了运行时包提供的API。
四、使用方法
当使用sync.Cond时,通常需要以下几个步骤:
1)定义一个互斥锁,用于保护共享数据;
2)创建一个sync.Cond对象,关联这个互斥锁;
3)在需要等待条件变量的地方,获取这个互斥锁,并使用Wait方法等待条件变量被通知;
4)在需要通知等待的协程时,使用Signal或Broadcast方法通知等待的协程。
5)最后,释放这个互斥锁。
在Cond里主要有3个方法:
sync.NewCond(l Locker): 新建一个 sync.Cond 变量,注意该函数需要一个 Locker 作为必填参数,这是因为在 cond.Wait() 中底层会涉及到 Locker 的锁操作
Cond.Wait(): 阻塞等待被唤醒,调用Wait函数前需要先加锁;并且由于Wait函数被唤醒时存在虚假唤醒等情况,导致唤醒后发现,条件依旧不成立,因此需要使用 for 语句来循环地进行等待,直到条件成立为止
Cond.Signal(): 只唤醒一个最先 Wait 的 goroutine,可以不用加锁
Cond.Broadcast(): 唤醒所有Wait的goroutine,可以不用加锁
示例:
package main import ( "fmt" "sync" "time" ) func main() { var mu sync.Mutex cond := sync.NewCond(&mu) done := make(chan bool) // 启动一个goroutine等待条件 go func() { mu.Lock() defer mu.Unlock() fmt.Println("Waiting for condition...") cond.Wait() // 等待条件变量被唤醒 // Wait 内部会先调用 c.L.Unlock(),来先释放锁,如果调用方不先加锁的话,会报错 fmt.Println("Condition received!") done <- true }() // 模拟一些耗时的工作 time.Sleep(2 * time.Second) // 在主goroutine中发送Signal信号,唤醒等待的goroutine fmt.Println("Sending signal...") cond.Signal() // 唤醒一个等待的goroutine // 等待goroutine完成 <-done fmt.Println("Done") }
输出:
Waiting for condition...
Sending signal...
Condition received!
Done
创建了一个互斥锁 mu 和一个 sync.Cond 变量 cond,然后启动一个goroutine等待条件变量被唤醒,并在主goroutine中模拟一些耗时的工作后,通过 cond.Signal() 方法发送信号,唤醒等待的goroutine。一旦条件变量被唤醒,等待的goroutine会继续执行。当运行这个示例时,会看到输出中的等待和唤醒消息,以及最后的"Done"表示成功完成。
请注意,在使用Signal方法时,通常需要在互斥锁的保护下调用,以确保对条件变量的访问是线程安全的。
五、使用注意事项
(1)sync.Cond 不能被复制
主要原因是 sync.Cond 内部是维护着一个 Goroutine 通知队列 notifyList。如果这个队列被复制的话,那么在并发场景下导致不同 Goroutine 之间操作的 notifyList.wait、notifyList.notify 并不是同一个,这会导致出现有些 Goroutine 会一直阻塞。
(2)唤醒顺序
从等待队列中按照顺序唤醒,先进入等待队列,先被唤醒。
(3)调用Wait方法前未加锁
如果在调用Wait方法前未加锁,此时会直接panic。(调用 Wait() 函数前,需要先获得条件变量的成员锁,原因是需要互斥地变更条件变量的等待队列。在 Wait() 返回前,会重新上锁。重新上锁的原因是主调在 Wait 后会进行解锁操作,避免重复解锁引发 panic)
package main
import (
"fmt"
"sync"
"time"
)
var (
count int
cond *sync.Cond
lk sync.Mutex
)
func main() {
cond = sync.NewCond(&lk)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
time.Sleep(time.Second)
count++
cond.Broadcast()
}
}()
go func() {
defer wg.Done()
for {
time.Sleep(time.Millisecond * 500)
//cond.L.Lock()
for count%10 != 0 {
cond.Wait()
}
t.Logf("count = %d", count)
//cond.L.Unlock()
}
}()
wg.Wait()
}
上面代码中,协程一每隔1s,将count字段的值自增1,然后唤醒所有处于等待状态的协程。协程二执行的条件为count的值为10的倍数,此时满足执行条件,唤醒后将会继续往下执行。
但是这里在调用sync.Wait方法前,没有先获取锁,下面是其执行结果,会抛出 fatal error: sync: unlock of unlocked mutex 错误,结果如下:
count = 0
fatal error: sync: unlock of unlocked mutex
因此,在调用Wait方法前,需要先获取到与sync.Cond关联的锁,否则会直接抛出异常。
强制调用Wait方法前需要先获取该锁。这里的原因在于调用Wait方法如果不加锁,有可能会出现竞态条件。
这里假设多个协程都处于等待状态,然后一个协程调用了Broadcast唤醒了其中一个或多个协程,此时这些协程都会被唤醒。
正常的用法应该是,在调用Wait
方法前便加锁,只会有一个协程判断是否满足condition
条件,然后执行后续操作。这样子就不会出现即使不满足条件,也会执行后续操作的情况出现。
(4)Wait方法接收到通知后,未重新检查条件变量
调用sync.Wait方法,协程进入阻塞状态后被唤醒,没有重新检查条件变量,此时有可能仍然处于不满足条件变量的场景下。然后直接执行后续操作,有可能会导致程序出错。
package main
import (
"fmt"
"sync"
"time"
)
var (
count int
cond *sync.Cond
lk sync.Mutex
)
func main() {
cond = sync.NewCond(&lk)
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
for {
time.Sleep(time.Second)
cond.L.Lock()
// 将flag 设置为true
flag = true
// 唤醒所有处于等待状态的协程
cond.Broadcast()
cond.L.Unlock()
}
}()
for i := 0; i < 2; i++ {
go func(i int) {
defer wg.Done()
for {
time.Sleep(time.Millisecond * 500)
cond.L.Lock()
// 不满足条件,此时进入等待状态
if !flag {
cond.Wait()
}
// 被唤醒后,此时可能仍然不满足条件
fmt.Printf("协程 %d flag = %t", i, flag)
flag = false
cond.L.Unlock()
}
}(i)
}
wg.Wait()
}
启动了一个协程,定时将flag设置为true,相当于每隔一段时间,便满足执行条件,然后唤醒所有处于等待状态的协程。
然后又启动了两个协程,在满足条件的前提下,开始执行后续操作,但是这里协程被唤醒后,没有重新检查条件变量,具体看第39行。这里会出现的场景是,第一个协程被唤醒后,此时执行后续操作,然后将flag重新设置为false,此时已经不满足条件了。之后第二个协程唤醒后,获取到锁,没有重新检查此时是否满足执行条件,直接向下执行,这个就和我们预期不符,可能会导致程序出错,代码执行效果如下:
协程 1 flag = true
协程 0 flag = false
协程 1 flag = true
协程 0 flag = false
可以看到,此时协程0执行时,flag
的值均为false
,说明此时其实并不符合执行条件,可能会导致程序出错。因此正确用法应该像下面这样子,被唤醒后,需要重新检查条件变量,满足条件之后才能继续向下执行。
c.L.Lock()
// 唤醒后,重新检查条件变量是否满足条件
for !condition() {
c.Wait()
}
// 满足条件情况下,执行的逻辑
c.L.Unlock()
(5)sync.Cond 和 channel 的区别?
channel 定位于通信,用于一发一收的场景,sync.Cond 定位于同步,用于一发多收的场景。虽然 channel 可以通过 close 操作来达到一发多收的效果,但是 closed 的 channel 已无法继续使用,而 sync.Cond 依旧可以继续使用。这可能就是“全能”与“专精”的区别。
1)作用:互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。
2)使用场景:sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
六、总结
Go 语言中的 sync.Cond 并发原语,它是用于实现 goroutine 之间的同步的重要工具。首先学习了 sync.Cond 的基本使用方法,包括创建和使用条件变量、使用Wait和Signal/Broadcast方法等。sync.Cond 的实现原理,主要是对等待队列的使用,从而sync.Cond有更好的理解,能够更好得使用它。同时要使用sync.Cond的注意事项,如调用Wait方法前需要加锁、Wait方法接收到通知后未重新检查条件变量等。