go锁基础 - atomic、sema

发布时间 2023-12-01 12:11:33作者: 杨阳的技术博客

atomicsema是实现go中锁的基础,简单看下他们的实现原理。

atomic

`atomic 常用来作为保证原子性的操作。

当多个协程,同时一个数据进行操作时候,如果不加锁,最终的很难得到想要的结果。

  var p int64 = 0
  func add() {
  	p = p + 1
  }

  func main() {

  	for i := 0; i < 1000; i++ {
  		go add()
  	}
  	time.Sleep(time.Second * 5)
  	fmt.Println(p) //982
  }

这种情况下,最终打印的 都不会是1000,每次不固定。

改成atomic 能解决
var p int64 = 0
func add() {
	atomic.AddInt64(&p, 1)
}
func main() {

	for i := 0; i < 1000; i++ {
		go add()
	}
	time.Sleep(time.Second * 5)
	fmt.Println(p)
}

atomic 为什么能做到?

 TEXT	sync∕atomic·AddInt64(SB), NOSPLIT, $0-24
	GO_ARGS
	MOVD	$__tsan_go_atomic64_fetch_add(SB), R9
	BL	racecallatomic<>(SB)
	MOVD	add+8(FP), R0	// convert fetch_add to add_fetch
	MOVD	ret+16(FP), R1
	ADD	R0, R1, R0
	MOVD	R0, ret+16(FP)
	RET

老的版本中是能见到,lock 这种操作系统级别的锁,新版的go已经改写了这块逻辑,但是能猜想到效果肯定一样。 如果有理清楚的,评论区可以交流下。

小结:

原子操作是一种硬件层面加锁的机制
保证操作一个变量的时候,其他协程/线程无法访问

sema

几乎在go的每个锁的定义都能看到sema的身影,理解了sema再看 互斥锁、读写锁就会很好理解。

信号量锁/信号锁
核心是一个uint32值,含义是同时可并发的数量
每一个sema 锁都对应一个SemaRoot结构体
SemaRoot中有一个平衡二叉树用于协程排队

例如:

 type Mutex struct {
  	state int32
  	sema  uint32 
 }

sema的uint32中的 每一个数,背后都对应一个  semaRoot的结构体
type semaRoot struct {
	lock  mutex
	treap *sudog        // root of balanced tree of unique waiters.
	nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}

type sudog struct {
        g *g              // 包含了 协程 g
	next *sudog   // 下一个
	prev *sudog  
	elem unsafe.Pointer // data element (may point to stack)
 }

结构如下:

这里可以先讲下,当这个 sema uint32 值,初始化时候,大于0 比如 赋值5,就代表着,并发时候,有5个协程可以获取锁。其他协程需要等待前面5个释放了,才能进入。

sema 大于0

   // 获取sema锁。大于0的情况
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
    	gp := getg()
    	if gp != gp.m.curg {
    		throw("semacquire not on the G stack")
    	}
    	// Easy case. // 容易的情况
    	if cansemacquire(addr) {
    		return
    	}
      // 方法很长,先看简单的部分。
}
  
func cansemacquire(addr *uint32) bool {
	for {
		v := atomic.Load(addr) // 根据sema的地址,获取int32的值
		if v == 0 {   // 如果未0了,就获取失败了
			return false
		}
        // 大于0 ,则把 sema的值减去1
		if atomic.Cas(addr, v, v-1) { // cas 就是 CompareAndSwapInt 的底层实现
			return true
		}
	}
}

到此,对sema为什么只是定义为一个 uint32的值有了大致理解,就是一个控制能有多少个协程同时获取锁的值。

看下释放:

func semrelease1(addr *uint32, handoff bool, skipframes int) {
	root := semtable.rootFor(addr)
	atomic.Xadd(addr, 1) // 给sema的值 加上1

	// Easy case: no waiters?
	// This check must happen after the xadd, to avoid a missed wakeup
	// (see loop in semacquire).
	if root.nwait.Load() == 0 { // 如果没有 nwait在等待,就直接结束。 
		return
	}
}

nwait 就是等待协程的个数。

小结, 当sema的值大于0 :

获取锁:uint32减1 ,获取成功

释放锁:uint32加1,释放成功

sema值等于0

再看 semacquire1

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {

     / / Harder case:
	//	increment waiter count
	s := acquireSudog()
	root := semtable.rootFor(addr) // 根据sema的地址,获取了包含 sudog的队列
	for {
		root.queue(addr, s, lifo) // 将新的协程放入这个等待队列中
		goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) 
        // 主动调用协程的gopark,让它休眠,gopark的说明看 go GMP中有讲
	}
	releaseSudog(s)
}

//再看释放
func semrelease1(addr *uint32, handoff bool, skipframes int) {
	root := semtable.rootFor(addr)
	atomic.Xadd(addr, 1)
	if root.nwait.Load() == 0 {
		return
	}
    // 如果等待队列不是0 ,就需要释放一个
	// Harder case: search for a waiter and wake it.
	lockWithRank(&root.lock, lockRankRoot)
	
	s, t0 := root.dequeue(addr) // 从全局的队列中,取出一个
	if s != nil {
		root.nwait.Add(-1) // 把等待的数量减一
	}
	unlock(&root.lock) //操作全局队列都需要加锁
}

小结,当sema的值等于0时候:

获取锁:协程休眠,进入堆树等待
释放锁:从堆树中取出一个协程,唤醒
sema 锁退化成一个专用休眠队列

有没有可能sema的值,小于0 ?

看看sema的定义 `uint32` 所以 不可能。

总结下:

atomic原子操作是一种硬件层面的加锁机制。

sema 背后是一整套锁的管理和等待的机制,开发者在使用时候,感知不到。

sema的值就是能同时获取锁协程的个数。sema的地址作为了休眠等待队列(平衡树)的key。