channel底层原理

发布时间 2023-03-29 15:00:32作者: 独揽风月

1. 数据结构

1.1 hchan

位于runtime/chan.go

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    
    lock mutex
}
  • qcount:当前 channel 中存在多少个元素;

  • dataqsize: 当前 channel 能存放的元素总容量;

  • buf:channel 中用于存放元素的环形缓冲区;

  • elemsize:channel 元素类型的大小;

  • closed:标识 channel 是否关闭;

  • elemtype:channel 元素类型;

  • sendx:发送元素进入环形缓冲区的 index;

  • recvx:接收元素所处的环形缓冲区的 index;

  • recvq:因接收而陷入阻塞的协程队列;

  • sendq:因发送而陷入阻塞的协程队列;

1.2 waitq

type waitq struct {
    first *sudog
    last  *sudog
}

waitq:阻塞的协程队列

  • first:队列头部
  • last:队列尾部

1.3 sudog

位于runtime/runtime2.go

type sudog struct {
    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    isSelect bool

    c        *hchan 
}

sudog:用于包装协程的节点

  • g:goroutine,协程;
  • next:队列中的下一个节点;
  • prev:队列中的前一个节点;
  • elem: 读取/写入 channel 的数据的容器;
  • isSelect:标识当前协程是否处在 select 多路复用的流程中;
  • c:标识与当前 sudog 交互的 chan.

2. 写流程

2.1 写时存在阻塞读协程

此时环形缓冲区一定为空

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...

    lock(&c.lock)

    // ...

    if sg := c.recvq.dequeue(); sg != nil { //如果读队列有数据
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    // ...
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// ...
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	//...
	goready(gp, skip+1) // 唤醒协程
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// ...
	memmove(dst, src, t.size)
}
  • 加锁;
  • 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog;
  • 在 sendDirect 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine;
  • 在send()方法里执行goready唤醒这个读协程,加入到本地的P队列,并解锁.

2.2 写时无阻塞读协程但环形缓冲区仍有空间

写入环形缓冲区:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // ...
}
  • 加锁;
  • 将当前元素添加到环形缓冲区 sendx 对应的位置;
  • sendx++;
  • qcount++;
  • 解锁,返回.

2.3 写时无阻塞读协程且环形缓冲区已满

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)

    // ...
    gp := getg()  // 获取当前执行的goroutine
    mysg := acquireSudog() // 初始化一个sudog
    mysg.elem = ep // 绑定一个数据容器
    mysg.g = gp  // 绑定g
    mysg.c = c   // 绑定chan
    gp.waiting = mysg
    c.sendq.enqueue(mysg) // 将sudog放入等待队列
    
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //这里会陷入阻塞,直到被唤醒
    
    gp.waiting = nil
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
  • 加锁;
  • 构造封装当前 goroutine 的 sudog 对象;
  • 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系;
  • 把 sudog 添加到当前 channel 的阻塞写协程队列中;
  • gopark 当前协程;
  • 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走);
  • 解锁,返回

2.4 写流程整体串联

3. 读流程

3.1 读时有阻塞的写协程

说明此时缓冲区已满或者无缓冲区

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   
    lock(&c.lock)

    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
     }
     // ...
}
  • 加锁;
  • 从阻塞写协程队列中获取到一个写协程;
  • 倘若 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程;
  • 倘若 channel 有缓冲区,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒刚才头部的那个写协程;
  • 解锁,返回.

3.2 读时无阻塞写协程且缓冲区有元素

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // ...
    lock(&c.lock)
    // ...
    if c.qcount > 0 { // 缓冲区有元素
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    // ...
  • 加锁;

  • 获取到 recvx 对应位置的元素;

  • 将自己的数据move到等待队列里的sudog

  • recvx++,头部移到下一个sudog

  • qcount--

  • 解锁,返回

3.3 读时无阻塞写协程且缓冲区无元素

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // ...
   lock(&c.lock)
   // ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) //一直阻塞,直到被其他协程唤醒

    gp.waiting = nil
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}
  • 加锁;
  • 构造封装当前 goroutine 的 sudog 对象;
  • 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系;
  • 把 sudog 添加到当前 channel 的阻塞读协程队列中;
  • park 当前协程;
  • 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被写入);
  • 解锁,返回

3.4 读流程整体串联

4. 阻塞与非阻塞模式

在上述源码分析流程中,均是以阻塞模式为主线进行讲述,那什么情况下会触发阻塞模式呢?

只有在 select 语句组成的多路复用分支中,与 channel 的交互会变成非阻塞模式,如:

ch := make(chan int)
select{
  case <- ch:
  default:
}

在 select 语句包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 方法,底层同样复用 chanrecv 和 chansend 方法,但此时由于第三个入参 block 被设置为 false,导致后续会走进非阻塞的处理分支.

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())  // 这里传入false,执行非阻塞模式
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    return chanrecv(c, elem, false)  // 这里传入false,执行非阻塞模式
}

通读chansend()的代码,可以发现在非阻塞模式下,永远不会走到缓冲区满了导致goroutine进入阻塞的逻辑,函数返回的selected要么是true,要么是false

5. 两种读 channel 的协议

读取 channel 时,可以根据第二个 bool 型的返回值用以判断当前 channel 是否已处于关闭状态:

ch := make(chan int, 2)
got1 := <- ch
got2,ok := <- ch

实现上述功能的原因是,两种格式下,读 channel 操作会被汇编成不同的方法:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

6. 关闭channel

位于runtime/chan.go

func closechan(c *hchan) {
	// ...

	c.closed = 1

	var glist gList

	// release all readers
	for {
		sg := c.recvq.dequeue()
		// ...
		gp := sg.g
		// ...
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		// ...
		gp := sg.g
		// ...
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}
  • 将阻塞读协程队列中的协程节点统一添加到 glist;
  • 将阻塞写协程队列中的协程节点统一添加到 glist;
  • 唤醒 glist 当中的所有协程.

实际上不可能同时存在读队列和写队列,因此上面两个for循环可以串行执行