Golang如何优雅地关闭 channel

发布时间 2023-06-21 22:14:49作者: 技术颜良

Golang如何优雅地关闭 channel

萧瑟 golang面试经典讲解 2023-05-31 21:00 发表于上海

一、介绍

想必听说过 go 的,应该都知道 go 的最大的特性 goroutine 并发编程,而说到并发编程,使用 channel 进行数据传输是 go 中的必修课。

go 的并发哲学:不要通过共享内存来通信,而要通过通信来实现内存共享。

channel 的坑不少,本篇简单聊聊关闭 channel 的方法。

二、关闭channel原则

 

坊间流传的关闭 channel 的原则:

不要从接收端关闭 channel,也不要在有多个发送端时,主动关闭 channel

这个原则的来源就因为:

  1. 不能向已关闭的 channel 发送数据会导致panic

  2. 不能重复关闭已关闭的 channel 会导致panic

一个比较粗糙的检查 channel 是否关闭的函数:

package main
import "fmt"
func IsClosed(ch <-chan int) bool { select { case <-ch: return true default: }
return false}
func main() { c := make(chan int) fmt.Println(IsClosed(c)) // false close(c) fmt.Println(IsClosed(c)) // true}

 

看一下代码,其实存在很多问题。首先,IsClosed 函数是一个有副作用的函数。每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。这不是一个好的函数,干活就干活,还顺手牵羊!

其次,IsClosed 函数返回的结果仅代表调用那个瞬间,并不能保证调用之后会不会有其他 goroutine 对它进行了一些操作,改变了它的这种状态。例如,IsClosed 函数返回 true,但这时有另一个 goroutine 关闭了 channel,而你还拿着这个过时的 “channel 未关闭”的信息,向其发送数据,就会导致 panic 的发生。当然,一个 channel 不会被重复关闭两次,如果 IsClosed 函数返回的结果是 true,说明 channel 是真的关闭了。

有两个不那么优雅地关闭 channel 的方法:

  1. 使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。

  2. 使用 sync.Once 来保证只关闭一次。

 

三、如何优雅关闭channel

根据 sender 和 receiver 的个数,分下面几种情况:

  1. 一个 sender,一个 receiver

  2. 一个 sender, M 个 receiver

  3. N 个 sender,一个 reciver

  4. N 个 sender, M 个 receiver

 

3.1 1和2的情况

只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。
func main() {    dataCh := make(chan int, 100)
// sender go func() { for i := 0; i < 1000; i++ { dataCh <- i + 1 } log.Println("send complete") close(dataCh) }()
// receiver for i := 0; i < 5; i++ { go func() { for { data, ok := <-dataCh if !ok { // 已关闭 return } _ = data } }() }
select { case <-time.After(time.Second * 5): fmt.Println(runtime.NumGoroutine()) }}

3.1 3的情况

优雅关闭 channel 的方法是:the only receiver says "please stop sending more" by closing an additional signal channel。

解决方案就是增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止接收数据。代码如下:

package main
import ( "log" "math/rand" "sync" "time")
func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0)
const Max = 100000 const NumSenders = 1000
wgReceivers := sync.WaitGroup{} wgReceivers.Add(1)
dataCh := make(chan int) stopCh := make(chan struct{})
// senders for i := 0; i < NumSenders; i++ { go func() { for { select { case <-stopCh: return default: } select { case <-stopCh: return case dataCh <- rand.Intn(Max): } } }() }
// receiver go func() { defer wgReceivers.Done()
for value := range dataCh { if value == Max-1 { close(stopCh) return }
log.Println(value) } }()
wgReceivers.Wait()}

 

这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。

需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。

3.1 4的情况

优雅关闭 channel 的方法是:any one of them says "let's end the game" by notifying a moderator to close an additional signal channel。

和第 3 种情况不同,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。

package main
import ( "fmt" "math/rand" "strconv" "time")
func main() { rand.Seed(time.Now().UnixNano())
const Max = 100000 const NumReceivers = 10 const NumSenders = 1000
dataCh := make(chan int, 100) stopCh := make(chan struct{})
// It must be a buffered channel. toStop := make(chan string, 1)
var stoppedStr string
// moderator go func() { stoppedStr = <-toStop fmt.Println(stoppedStr) close(stopCh) }()
// senders for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { select { case toStop <- "sender#" + id: default: } return }
select { case <-stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) }
// receivers for i := 0; i < NumReceivers; i++ { go func(id string) { for { select { case <-stopCh: return case value := <-dataCh: if value == Max-1 { select { case toStop <- "receiver#" + id: default: } return }
fmt.Println(value) } } }(strconv.Itoa(i)) }
select { case <-time.After(time.Second): }
}

 

代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。

这个例子可以在 sender 和 receiver 端都发送关闭信号,通过 toStop 这个中间人来传递关闭信号,接收到之后关闭 stopCh。这里需要注意将 toStop 定义为带缓冲的 channel,若是不带缓冲,可能会出现 <-toStop 这个接收协程还未跑起来时,就已经有其他协程向其发送了 toStop<-xx 关闭信号。

这时在 sender 或 receiver 的 select 分支就可能走 default 语句,导致逻辑错误。

这个例子中,简单点的做法可以给 toStop 设置缓存为 sender 与 receiver 的和,就可以简写为如下:

...toStop := make(chan string, NumReceivers + NumSenders)...            value := rand.Intn(Max)            if value == 0 {                toStop <- "sender#" + id                return            }...                if value == Max-1 {                    toStop <- "receiver#" + id                    return                }...

直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。

可以看到,这里同样没有真正关闭 dataCh,原样同第 3 种情况。

channel 的注意点

channel 的声明必须使用 make 关键字,不能直接 var c chan int,这样得到的是 nil channel

不能向 nil channel 发送数据

var c chan intc <- 1 // panic

 

四、总结

关闭 channel 的基本法则:

  1. 单 sender 的情况下,都可以直接在 sender 端关闭 channel。

  2. 多 sender 的情况下,可以增加一个传递关闭信号的 channel 专门用于关闭数据传输的 channel。

原则:不要从接收端关闭 channel,也不要在有多个发送端时,主动关闭 channel。

本质:已关闭的 channel 不能再关闭(或者再向其发送数据)。