Go 并发编程(二):锁、Select、Context、定时器

发布时间 2023-03-22 21:13:33作者: Juno3550


Sync(锁)

在前面讲 channel 的时候,我们说到在 Go 语言并发编程中,倡导使用通信共享内存,不要使用共享内存通信,即 goroutine 之间尽量通过 channel 来协作。

而在其他的传统语言中,都是通过共享内存加上锁机制来保证并发安全的,同样 Go 语言也提供了对共享内存并发安全机制的支持,这些功能都存在于 sync 包下。


sync.WaitGroup

在前面很多 goroutine 的示例中,我们都是通过 time.Sleep() 方法让主 goroutine 等待一段时间,以便子 gortoutine 能够执行完打印结果,显然这不是一个很好的办法,因为我们不知道所有的子 gortoutine 要多久才能执行完

解决方案一

  • 在每个 goroutine 中,向管道里发送一条数据,这样我们在程序最后,通过 for 循环将管道里的数据全部取出,直到数据全部取出完毕才能继续后面的逻辑,这样就可以实现等待各个 goroutine 执行完。
  • 但是,这样使用 channel 显得并不优雅。其次,我们得知道具体循环的次数,来创建管道的大小,假设次数非常的多,则需要申请同样数量大小的管道出来,对内存也是不小的开销。

解决方案二:使用 sync.WaitGroup

Go 语言中可以使用 sync.WaitGroup 来实现并发任务的同步以及协程任务等待。

sync.WaitGroup 是一个对象,里面维护者一个计数器,并且通过三个方法来配合使用:

  • (wg *WaitGroup) Add(delta int):计数器加 delta
  • (wg *WaitGroup) Done():计数器减 1
  • (wg *WaitGroup) Wait():会阻塞代码的运行,直至计数器减为0

示例:


import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func myGoroutine(i int) {
    defer wg.Done()
    fmt.Println("myGoroutine: ", i)
}
func main() {
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go myGoroutine(i)
    }
    wg.Wait()
    fmt.Println("end!!!")
}

运行结果:

myGoroutine:  2
myGoroutine:  5
myGoroutine:  1
myGoroutine:  0
myGoroutine:  9
myGoroutine:  7
myGoroutine:  6
myGoroutine:  8
myGoroutine:  3
myGoroutine:  4
end!!!         

程序首先把 wg 的计数设置为 10,每个 for 循环运行完毕都把计数器减 1,main 函数中执行到 wg.Wait() 会一直阻塞,直到 wg 的计数器为零。最后打印了 10 个 myGoroutine!,待所有子 goroutine 任务结束后主 goroutine 才退出。

注意:sync.WaitGroup 对象的计数器不能为负数,否则会 panic。在使用的过程中,我们需要保证 add() 的参数值,以及执行完 Done() 之后计数器大于等于零。


sync.Once

sync.Once 最大的作用就是延迟初始化,对于一个使用 sync.Once 的变量,我们并不会在程序启动的时候初始化,而是在第一次用的它的时候才会初始化,并且只初始化这一次,初始化之后驻留在内存里,这就非常适合配置文件加载场景,设想一下,如果是在程序刚开始就加载配置,若迟迟未被使用,则既浪费了内存,又延长了程序加载时间,而 sync.Onece 就刚好解决了这个问题。

sync.Once 可以在代码的任意位置初始化和调用,并且线程安全。

示例:

// 声明配置结构体Config
type Config struct{}

var instance Config
var once sync.Once     // 声明一个sync.Once变量

// 获取配置结构体
func InitConfig() *Config {
   once.Do(func(){
      instance = &Config{}
   })
   return instance
}

只有在第一次调用 InitConfig() 获取Config 指针时,才会执行 once.Do(func(){instance = &Config{}}) 语句,执行完之后 instance 就驻留在内存中,后面再次执行 InitConfig() 的时候,就直接返回内存中的 instance (也就是单例模式)。

与init()的区别:有时候我们会使用 init() 方法进行初始化,init() 方法是在其所在的 package 首次加载时执行的;而 sync.Onece 可以在代码的任意位置初始化和调用,是在第一次用的它的时候才会初始化。


sync.Lock:锁

说到并发编程,就不得不谈一个老生常谈的问题,那就是资源竞争,因为一旦开启了多个 goroutine 去处理问题,那么这些 goroutine 就有可能在同一时间操作同一个系统资源,比如同一个变量、文件等等,如果不加控制的话,那么就会存在最后只有一个操作对资源生效,显然不是我们想要的结果。

示例:并发不安全

var num int = 1

func add() {
    num++
}

func main() {
    go add()
    go add()
    go add()
    time.Sleep(time.Second * 5)
    fmt.Println("num:", num)
}

运行结果:

2955

在 Go 语言中,有两种方式来控制并发安全,锁和原子操作。


sync.Mutex:互斥锁

互斥锁是一种最常用的控制并发安全的法式,它在同一时间只允许一个 goroutine 对共享资源进行访问。

互斥锁的声明方式如下:

var lock sync.Mutex

互斥锁有两个方法:

func (m *Mutex) Lock()     // 加锁
func (m *Mutex) Unlock()   // 解锁

注意:

  1. 一个互斥锁只能同时被一个 goroutine 锁定,其它 goroutine 将阻塞,直到互斥锁被解锁才能加锁成功。

  2. 对一个未锁定的互斥锁进行解锁将会产生运行时错误。

  3. 对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助 defer。锁定后,立即使用 defer 语句保证互斥锁及时解锁。

示例:对上面并发不安全的例子稍做修改,加上互斥锁

package main

import (
    "fmt"
    "sync"
)

var num int = 1

func add(wg *sync.WaitGroup, mu *sync.Mutex) {
    mu.Lock() // 加锁
    defer func() {
        wg.Done()   // 计数器-1
        mu.Unlock() // 解锁
    }()
    for i := 0; i < 1000; i++ {
        num++
    }
}

func main() {
    var wg sync.WaitGroup
    var mu sync.Mutex
    wg.Add(3) // 开启3个goroutine,计数器加3
    go add(&wg, &mu)
    go add(&wg, &mu)
    go add(&wg, &mu)
    wg.Wait() // 等待所有协程执行完毕
    fmt.Println("num:", num)
}

运行结果:

3001

sync.RWMutex:读写锁

互斥锁的本质是当一个 goroutine 访问的时候,其他 goroutine 都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能,因为程序由原来的并行执行变成了串行执行。

其实,当我们对一个不会变化的数据只做“读”操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少 goroutine 同时读取,都是可以的。

所以问题不是出在“读”上,而是出在“写”上,也就是修改数据。

由于修改的数据要同步,这样其他 goroutine 才可以感知到。所以真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操作的必要的

因此,衍生出另外一种锁,叫做读写锁:

  • 写写互斥
  • 读写互斥
  • 读读不互斥

从互斥锁和读写锁的源码可以看出,它们是同源的。读写锁的内部用互斥锁来实现写锁定操作之间的互斥。可以把读写锁看作是互斥锁的一种扩展。

读写锁的使用方法如下:

func (rw *RWMutex) Lock()     // 对写锁加锁
func (rw *RWMutex) Unlock()   // 对写锁解锁

func (rw *RWMutex) RLock()    // 对读锁加锁
func (rw *RWMutex) RUnlock()  // 对读锁解锁

示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var count int           // 全局变量count
var rwlock sync.RWMutex // 全局读写锁rwlock

func read(i int) {
    rwlock.RLock()
    fmt.Printf("读 goroutine %d 读数据开始\n", i)
    fmt.Printf("读 goroutine %d 读数据结束,读到: %d\n", i, count)
    defer rwlock.RUnlock()
}

func write(i int) {
    rwlock.Lock()
    fmt.Printf("写 goroutine %d 写数据开始\n", i)
    count++
    fmt.Printf("写 goroutine %d 写数据结束,新值为: %d\n", i, count)
    defer rwlock.Unlock()
}

func main() {
    for i := 0; i < 3; i++ {
        go read(i)
    }
    for i := 0; i < 3; i++ {
        go write(i)
    }
    time.Sleep(time.Second * 5)
    fmt.Println("final count:", count)
}

运行结果:

读 goroutine 1 读数据开始
读 goroutine 2 读数据开始
读 goroutine 2 读数据结束,读到: 0
读 goroutine 1 读数据结束,读到: 0
写 goroutine 2 写数据开始
写 goroutine 2 写数据结束,新值为: 1
读 goroutine 0 读数据开始
读 goroutine 0 读数据结束,读到: 1
写 goroutine 1 写数据开始
写 goroutine 1 写数据结束,新值为: 2
写 goroutine 0 写数据开始
写 goroutine 0 写数据结束,新值为: 3
final count: 3

结果分析:

  • 首先,读 goroutine 1 和 2 均获得了读锁并同时进行了读操作,可以看出读操作并不互斥。
  • 而后面读写操作并交替进行,没有看到同时操作的情况,可以看出读写、写写操作互斥。

死锁

死锁是一种状态:当两个或以上的 goroutine 在执行过程中,因争夺共享资源处在互相等待的状态,如果没有外部干涉将会一直维持这种阻塞状态,我们称这时的系统发生了死锁。

死锁场景:

  1. Lock/Unlock 不成对

    • 这类情况最常见的场景就是对锁进行拷贝使用:即如果将带有锁结构的变量赋值给其他变量,锁的状态会复制。
    • 所以在使用锁的时候,我们应当尽量避免锁拷贝,并且保证 Lock() 和 Unlock() 成对出现。没有成对出现容易会出现死锁的情况,或者是 Unlock 一个未加锁的 Mutex 而导致 panic。
  2. 循环等待

    • 另一个容易造成死锁的场景就是循环等待,如 A 等 B,B 等 C,C 等 A 。
    • 比如两个 goroutine,一个 goroutine 先锁 mu1,再锁 mu2,另一个 goroutine 先锁 mu2,再锁 mu1;在它们分别进行第二次加锁操作的时候,彼此等待对方释放锁,这样就造成了循环等待,一直阻塞,形成死锁。

sync.Map:并发安全 Map

Go 语言内置的 Map 并不是线程安全的,在多个 goroutine 同时操作 map 时,会有并发问题。

示例:map 不能同时被多个 goroutine 读写

package main

import (
   "fmt"
   "strconv"
   "sync"
)

var m = make(map[string]int)

func getVal(key string) int {
   return m[key]
}

func setVal(key string, value int) {
   m[key] = value
}

func main() {
   wg := sync.WaitGroup{}
   wg.Add(10)
   for i := 0; i < 10; i++ {
      go func(num int) {
         defer wg.Done()
         key := strconv.Itoa(num)
         setVal(key, num)
         fmt.Printf("key=:%v,val:=%v\n", key, getVal(key))
      }(i)
   }
   wg.Wait()
}

运行结果:

fatal error: concurrent map writes

程序报错了,说明 map 不能同时被多个 goroutine 读写。

解决方案一:对 map 加锁

package main

import (
   "fmt"
   "strconv"
   "sync"
)

var m = make(map[string]int)
var mu sync.Mutex

func getVal(key string) int {
   return m[key]
}

func setVal(key string, value int) {
   m[key] = value
}

func main() {
   wg := sync.WaitGroup{}

   wg.Add(10)
   for i := 0; i < 10; i++ {
      go func(num int) {
         defer func() {
            wg.Done()
            mu.Unlock()
         }()
         key := strconv.Itoa(num)
         mu.Lock()
         setVal(key, num)
         fmt.Printf("key=:%v,val:=%v\n", key, getVal(key))
      }(i)
   }
   wg.Wait()
}

运行结果:

key=:9,val:=9
key=:4,val:=4
key=:0,val:=0
key=:1,val:=1
key=:2,val:=2
key=:3,val:=3
key=:6,val:=6
key=:7,val:=7
key=:5,val:=5
key=:8,val:=8

解决方案二:使用 sync 包提供的 map

sync 包中提供的一个开箱即用的并发安全版 map–sync.Map(在 Go 1.9 引入)。

sync.Map 不用初始化就可以使用,同时内置了如 Store、Load、LoadOrStore、Delete、Range 等操作方法。

示例:

package main

import (
   "fmt"
   "sync"
)

func main() {
   var m sync.Map
   // 1. 写入
   m.Store("name", "zhangsan")
   m.Store("age", 18)

   // 2. 读取
   age, _ := m.Load("age")
   fmt.Println(age.(int))

   // 3. 遍历:入参是一个函数
   m.Range(
       func(key, value interface{}) bool {
           fmt.Printf("key is:%v, val is:%v\n", key, value)
           return true
       }
   )

   // 4. 删除
   m.Delete("age")
   age, ok := m.Load("age")
   fmt.Println(age, ok)

   // 5. 读取或写入
   m.LoadOrStore("name", "zhangsan")
   name, _ := m.Load("name")
   fmt.Println(name)
}

运行结果:

18
key is:name, val is:zhangsan
key is:age, val is:18       
<nil> false                 
zhangsan  
  1. 通过 store 方法写入两个键值对
  2. 读取 key 为 age 的值,读出来 age 为 18
  3. 通过 range 方法遍历 map 的 key 和 value
  4. 删除 key 为 age 的键值对,删除完之后,再次读取 age,age 为空,ok 为 false 表示 map 里没有这个 key
  5. LoadOrStore 尝试读取 key 为 name 的值,读取不到就写入键值对 name-zhangsan ,能读取到就返回原来 map 里的 name 对应的值

注意:

  • sync.Map 没有提供获取 map 数量的方法,需要我们在对 sync.Map 进行遍历时自行计算。
  • sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。

sync.Atomic:原子操作

除了前面介绍的锁 mutex 以外,还有一种解决并发安全的策略,就是原子操作(sync.Atomic)。

所谓原子操作是指这一系列的操作在 CPU 上的执行是一个不可分割的整体,显然要么全部执行,要么全部不执行,不会受到其他操作的影响,也就不会存在并发问题。

atomic 和 metux 的区别:

  • 使用方式:通常 metux 用于保护一段执行逻辑;而 atomic 主要是对变量进行操作。
  • 底层实现:metux 由操作系统调度器实现;而 atomic 操作由底层硬件指令支持,保证在 CPU 上执行不中断。所以 atomic 的性能也会随 CPU 个数增加而线性提升。

atomic 提供的方法:

func AddT(addr *T, delta T)(new T)
func StoreT(addr *T, val T)
func LoadT(addr *T) (val T)
func SwapT(addr *T, new T) (old T)
func CompareAndSwapT(addr *T, old, new T) (swapped bool)
// T的类型是int32、int64、uint32、uint64和uintptr中的任意一种

示例:AddT

package main

import (
   "fmt"
   "sync"
   "sync/atomic"
)

func main() {

   var sum int32 =  0
   var wg sync.WaitGroup
   // 100个goroutine,每个goroutine都对sum+1,最后结果为100
   for i := 0; i < 100; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         atomic.AddInt32(&sum, 1)
      }()
   }
   wg.Wait()
   fmt.Printf("sum is %d\n",sum)
}

Select

Select是什么?

Select 是 Go 语言层面提供的一种多路复用机制,用于检测当前 goroutine 连接的多个 channel 是否有数据准备完毕,可用于读或写。

什么是 IO 多路复用?

  • 看到 select,很自然的会联想到 linux 提供的 IO 多路复用模型:select、poll、epoll。IO 多路复用主要用于提升程序处理 IO 事件的性能。Go 语言中的 select 与 linux 中的 select 有一定的区别。
  • 操作系统中的 IO 多路复用简单理解就就是用一个或者是少量线程处理多个 IO 事件。

简单对比一下传统的阻塞 IO 与 IO 多路复用:

  • 传统阻塞 IO:对于每一个网络 IO 事件,操作系统都会起一个线程去处理,在 IO 事件没准备好的时候,当前线程就会一直阻塞。
    image
    • 优点:逻辑简单,在阻塞等待期间线程会挂起,不会占用 CPU 资源。
    • 缺点:每个连接需要独立的线程单独处理,当并发请求量大时为了维护程序,内存、线程切换开销较大。
  • IO多路复用
    image
    • 优点:通过复用一个线程处理了多个 IO 事件,无需对额外过多的线程维护管理,资源和效率上都获得了提升。
    • 缺点:当连接数较少时,效率相比多线程+阻塞 I/O 模型效率较低。

Go 语言的 select 语句,是用来起一个 goroutine 监听多个 Channel 的读写事件,提高从多个 Channel 获取信息的效率,相当于也是单线程处理多个 IO 事件,其思想基本相同。


Select 使用

select的基本使用模式如下:

select {
        // 如果从channel 1读取数据成功,执行case语句 
        case <- chan1:     
          do ...   
        // 如果向channel 2写入数据成功,执行case语句 
        case chan2 <- 1:   
          do ...          
        // 如果上面都没有成功,进入default处理流程
        default:              
          do ...
}

可以看出,select的用法形式类似于switch,但是区别于switch的是,select各个case的表达式必须都是cnannel的读写操作

  • select通过多个case语句监听多个channel的读写操作是否准备好。
  • 若任何一个case可以执行了,则选择该case语句执行;若多个case都可以执行了,则随机选择一个执行
  • 若没有可以执行的case,则执行default语句。
  • 若没有default,则当前goroutine会阻塞。

空 select

当一个select中什么语句都没有,没有任何case,将会永久阻塞:

package main

func main() {
        select {
        }
}

运行结果:

fatal error: all goroutines are asleep - deadlock!

程序因为select语句导致永久阻塞,当前goroutine阻塞之后,由于go语言自带死锁检测机制,发现当前goroutine永远不会被唤醒,会报上述死锁错误。


没有 default 且 case 无法执行

package main

import (
   "fmt"
)

func main() {
   ch1 := make(chan int, 1)
   ch2 := make(chan int, 1)
   select {
   case <-ch1:
      fmt.Printf("received from ch1")
   case num := <-ch2:
      fmt.Printf("num is: %d", num)
   }
}

运行结果:

fatal error: all goroutines are asleep - deadlock!

程序中select从两个channel,ch1和ch2中读取数据,但是两个channel都没有数据,且没有goroutine往里面写数据,所以不可能读到数据,这两个case永远无法执行到,select也没有default,所以会出现永久阻塞,报死锁。


有单一 case 和 default

package main

import (
   "fmt"
)

func main() {
   ch := make(chan int, 1)
   select {
   case <-ch:
      fmt.Println("received from ch")
   default:
      fmt.Println("default!!!")
   }
}

运行结果:

default!!!

执行到select语句的时候,由于ch中没有数据,且没有goroutine往channel中写数据,所以不可能执行到,就会执行default语句,打印出default!!!


有多个 case 和 default

示例1:

package main

import (
   "fmt"
   "time"
)

func main() {
   ch1 := make(chan int, 1)
   ch2 := make(chan int, 1)
   go func() {
      time.Sleep(time.Second)
      for i := 0; i < 3; i++ {
         select {
         case v := <-ch1:
            fmt.Printf("Received from ch1, val = %d\n", v)
         case v := <-ch2:
            fmt.Printf("Received from ch2, val = %d\n", v)
         default:
            fmt.Println("default!!!")
         }
         time.Sleep(time.Second)
      }
   }()
   ch1 <- 1
   time.Sleep(time.Second)
   ch2 <- 2
   time.Sleep(4 * time.Second)
}

运行结果:

Received from ch1, val = 1
Received from ch2, val = 2
default!!!

主goroutine中向后往管道ch1和ch2中发送数据,在子goroutine中执行两个select,可以看到,在执行select的时候,那个case准备好了就会执行当下case的语句,最后没有数据可接受了,没有case可以执行,则执行default语句。

示例2:

package main

import (
   "fmt"
)

func main() {
   ch := make(chan int, 1)
   ch <- 5
   select {
   case v := <-ch:
      fmt.Printf("Received from ch1, val = %d\n", v)
   case v := <-ch:
      fmt.Printf("Received from ch2, val = %d\n", v)
   case v := <-ch:
      fmt.Printf("Received from ch3, val = %d\n", v)
   default:
      fmt.Println("default!!!")
   }
}

运行结果:

Received from ch3, val = 5

多次执行,3个case都有可能打印,这就是select选择的随机性。


Select 超时

有时候会出现 goroutine 阻塞的情况,那么我们如何避免整个程序进入阻塞的情况呢?我们可以利用 select 来设置超时,通过如下的方式实现:

func main() {
    c := make(chan int)
    o := make(chan bool)
    go func() {
        for {
            select {
            case v := <-c:
                fmt.Println(v)
            case <-time.After(5 * time.Second):
                fmt.Println("timeout")
                o <- true
                break
            }
        }
    }()
    //c <- 666 // 注释掉,引发 timeout
    <-o
}

Context

什么是 Context ?

Context 是用来在父子 goroutine 间进行值传递以及发送 cancel 信号的一种机制。

Context 主要有两个用途,也是在项目中经常使用的:

  • 用于并发控制,即控制协程的优雅退出
  • 上下文的信息传递

并发控制

对于一般的服务器而言,都是一直运行着的,等待接收来自客户端或者浏览器的请求做出响应。

思考这样一种场景:后台微服务架构中,一般服务器在收到一个请求之后,如果逻辑复杂,不会在一个 goroutine 中完成,而是会创建出很多的 goroutine 共同完成这个请求,就像下面这种情况:

image

有一个请求过来之后,先经过第一次 rpc 调用,然后再到 rpc2 ,后面创建执行两个 rpc , rpc4 里又有一次 rpc 调用 rpc5 ,等所有 rpc 调用成功后,返回结果。

假如在整个调用过程中, rpc1 发生了错误,如果没有 context 存在的话,我们还是得等所有的 rpc 都执行完才能返回结果,这样其实浪费了不少时间,因为一旦出错,我们完全可以直接在 rpc1 这里就返回结果了,不用等到后续的 rpc 都执行完,因其为后续的 rpc 执行就是没有意义的,浪费计算和 I/O 资源而已。

在引入 context 之后,就可以很好地处理这个问题,即在不需要子 goroutine 执行时,可以通过 context 通知子 goroutine 优雅地关闭。


Context 接口定义

Context 是 Go 语言在 1.7 版本中引入的一个标准库的接口,其定义如下:

type Context interface {
   Deadline() (deadline time.Time, ok bool)
   Done() <-chan struct{}
   Err() error
   Value(key interface{}) interface{}
}

接口提供了四个方法:

  • Deadline:设置 context.Context 被取消的时间,即截止时间。
  • Done:返回一个 Channel,当Context被取消或者到达截止时间,这个 Channel 就会被关闭,表示context结束,多次调用 Done 方法会返回同一个 Channel 。
  • Err:返回 context.Context 结束的原因,它只会在 Done 返回的 Channel 被关闭时才会返回非空的值,返回值有以下两种情况:
    • 如果是 context.Context 被取消,返回 Canceled 。
    • 如果是 context.Context 超时,返回 DeadlineExceeded 。
  • Value:从 context.Context 中获取键对应的值,类似于 map 的 get 方法。
    • 对于同一个 context,多次调用 value 并传入相同的 key,会返回相同的结果。
    • 如果没有对应的 key,则返回 nil,键值对是通过 WithValue 方法写入的。

context创建

根 context 创建:

  • 主要有以下两种方式创建根 context :
context.Backgroud()
context.TODO()
  • 从源码分析,context.Background 和 context.TODO 并没有太多的区别,都是用于创建根 context。根 context 是一个空的 context,不具备任何功能。

  • 一般情况下,如果当前函数没有上下文作为入参,我们都会使用 context.Background 创建一个根 context 作为起始的上下文向下传递。

功能 context 创建:

  • 根 context 在创建之后,不具备任何的功能,为了让 context 在我们的程序中发挥作用,我们要依靠 context 包提供的 With 系列函数来进行派生。
  • 主要有以下几个派生函数:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
  • 基于当前 context,每个 with 函数都会创建出一个新的 context,类似于我们熟悉的树结构,当前 context 称为父 context,派生出的新 context称为子 context,就像下面的 context 树结构:

image

通过四个 with 系列方法可以派生出四种类型的 context,每种 context 又可以通过同样的方式调用 with 系列方法继续向下派生新的 context,整个结构像一棵树。


context.WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

context.WithCancel 函数是一个取消控制函数,只需要一个 context 作为参数,就能够从 context.Context 中衍生出一个新的子 context 和取消函数 CancelFunc。

通过将这个子 context 传递到新的 goroutine 中来控制这些 goroutine 的关闭。

一旦我们执行返回的取消函数 CancelFunc,当前上下文以及它的子上下文都会被取消,所有的 Goroutine 都会同步收到取消信号。

示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 创建content和取消函数
    ctx, cancel := context.WithCancel(context.Background())
    
    // 让goroutine1和goroutine2执行6s
    go Watch(ctx, "goroutine1")
    go Watch(ctx, "goroutine2")
    time.Sleep(6 * time.Second)    
    
    // 通知goroutine1和goroutine2关闭
    fmt.Println("end watching!!!")
    cancel()  
    
    time.Sleep(1 * time.Second)
}

func Watch(ctx context.Context, name string) {
    for {
        select {
      // 主goroutine调用cancel后,会发送一个信号到ctx.Done()这个channel
        case <-ctx.Done(): // 如果读取到消息
            fmt.Printf("%s exit!\n", name)
            return
        default:
            fmt.Printf("%s watching...\n", name)
            time.Sleep(time.Second)
        }
    }
}

运行结果:

goroutine2 watching...
goroutine1 watching...
goroutine1 watching...
goroutine2 watching...
goroutine2 watching...
goroutine1 watching...
goroutine1 watching...
goroutine2 watching...
goroutine2 watching...
goroutine1 watching...
goroutine1 watching...
goroutine2 watching...
end watching!!!
goroutine1 exit!
goroutine2 exit!

ctx, cancel := context.WithCancel(context.Background()) 派生出了一个带有返回函数cancel的ctx,并把它传入到子goroutine中。

在接下来的6s时间内,由于没有执行cancel函数,子goroutine将一直执行default语句,打印监控。

6s之后,调用cancel,此时子goroutine会从ctx.Done()这个channel中收到消息,执行return结束。


context.WithDeadline

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

context.WithDeadline 也是一个取消控制函数,方法有两个参数,第一个参数是一个context,第二个参数是截止时间,且同样会返回一个子 context 和一个取消函数 CancelFunc 。

在使用的时候,若还没有到截止时间,我们可以通过手动调用 CancelFunc 来取消子 context,控制子 goroutine 的退出。如果到了截止时间,我们都没有调用 CancelFunc,子 context 的 Done() 管道也会收到一个取消信号,用来控制子 goroutine 退出。

示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 设置超时时间:当前时间+4s
    ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))
    
    defer cancel()
    
    // 让goroutine1和goroutine2执行6s
    go Watch(ctx, "goroutine1")
    go Watch(ctx, "goroutine2")
    time.Sleep(6 * time.Second)
    
    fmt.Println("end watching!!!")
}

func Watch(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done(): // 4s之后收到信号
            fmt.Printf("%s exit!\n", name)
            return
        default:
            fmt.Printf("%s watching...\n", name)
            time.Sleep(time.Second)
        }
    }
}

运行结果:

goroutine1 watching...
goroutine2 watching...
goroutine2 watching...
goroutine1 watching...
goroutine1 watching...
goroutine2 watching...
goroutine1 exit!
goroutine2 exit!
end watching!!!

这里并没有调用 cancel 函数,但是在过了 4s 之后,子 groutine 里 ctx.Done() 还是收到了信号,打印出 exit,子 goroutine 退出。这就是 WithDeadline 派生子 context 的用法。


context.WithTimeout

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

context.WithTimeout 和 context.WithDeadline 的作用类似,都是用于超时取消子 context,只是传递的第二个参数有所不同,context.WithTimeout 传递的第二个参数不是具体时间,而是时间长度。

示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
    defer cancel()
    
	// 让goroutine1和goroutine2执行6s
    go Watch(ctx, "goroutine1")
    go Watch(ctx, "goroutine2")
    time.Sleep(6 * time.Second)
    fmt.Println("end watching!!!")
}

func Watch(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("%s exit!\n", name)
            return
        default:
            fmt.Printf("%s watching...\n", name)
            time.Sleep(time.Second)
        }
    }
}

运行结果:

goroutine2 watching...
goroutine1 watching...
goroutine1 watching...
goroutine2 watching...
goroutine2 watching...
goroutine1 watching...
goroutine1 watching...
goroutine2 watching...
goroutine1 exit!
goroutine2 exit!
end watching!!!

与上个 context.WithDeadline 的样例代码基本一样,只是改变了下派生 context 的方法为 context.WithTimeout,具体体现在第二个参数不再是具体时间,而是变为了 4s 这个具体的时间长度,执行结果也是一样。


context.WithValue

func WithValue(parent Context, key, val interface{}) Context

context.WithValu 函数从父 context 中创建一个子 context 用于传值,函数参数是父 context、key、val,并返回一个 context 。

项目中这个方法一般用于上下文信息的传递,比如请求唯一 id,以及 trace_id 等,用于链路追踪以及配置透传。

示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func func1(ctx context.Context) {
    fmt.Printf("name is: %s", ctx.Value("name").(string))
}

func main() {
    ctx := context.WithValue(context.Background(), "name", "zhangsan")
    go func1(ctx)
    time.Sleep(time.Second)
}

运行结果:

name is: zhangsan

定时器

在项目中常常会有这样的场景,到了未来某一时刻,需要某个逻辑或者某个任务执行一次,或者是周期性的的执行多次,有点类似定时任务。这种场景就需要用到定时器, Go 中也内置了定时器的实现,timer 和 ticker 。


time.Timer

Timer 是一种一次性时间定时器,即在未来某个时刻,触发的事件只会执行一次。

Timer 的结构定义:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

Timer 结构里有一个 Time 类型的管道 C,主要用于事件通知。在未到达设定时间的时候,若管道内没有数据写入,则一直处于阻塞状态;到达设定时间后,会向管道内写入一个系统事时间,触发事件。


创建 Timer

func NewTimer(d Duration) *Timer

示例:

package main

import (
   "fmt"
   "time"
)

func main() {
   timer := time.NewTimer(2 * time.Second) //设置超时时间2s
   // 2秒后,定时器会向自己的C管道发送一个time.Time类型的元素值
   <-timer.C
   fmt.Println("after 2s Time out!")
}

运行结果:

after 2s Time out!

程序在2s后打印“after 2s Time out!”,因为创建了一个定时器timer,设置了超时时间为2s,执行 <-timer.C 时会一直阻塞,直到2s后,程序继续执行。


停止 Timer

func (t *Timer) Stop() bool

返回值:

  • true:执行 stop() 时 timer 还没有到达超时时间,即超时时间内停止了 timer 。
  • false:执行 stop() 时 timer 到达了超时时间,即过了超时时间才停止 timer。

示例:

package main

import (
   "fmt"
   "time"
)

func main()  {
    timer := time.NewTimer(2 * time.Second) //设置超时时间2s
    res := timer.Stop()
    fmt.Printf(res)
}

运行结果:

true

重置 Timer

func (t *Timer) Reset(d Duration) bool

对于已经过期或者是已经停止的timer,可以通过重置方法激活使其继续生效。

示例:

package main

import (
   "fmt"
   "time"
)

func main() {
   timer := time.NewTimer(time.Second * 2)

   <-timer.C
   fmt.Println("time out1")

   res1 := timer.Stop()
   fmt.Printf("res1 is %t\n", res1)

   timer.Reset(time.Second * 3)

   res2 := timer.Stop()
   fmt.Printf("res2 is %t\n", res2)

}

运行结果:

time out1
res1 is false
res2 is true 

程序2s之后打印"time out1",此时timer已经过期了,所以res1的值为false。
接下来执行timer.Reset(time.Second * 3) 又使timer生效了,并且重设超时时间为3s。
紧接着执行了timer.Stop(),还未到超时时间,所以res2的值为true。


time.AfterFunc

func AfterFunc(d Duration, f func()) *Timer

time.AfterFunc参数为超时时间d和一个具体的函数f,并返回一个Timer的指针。

该方法作用在创建出timer之后,在当前goroutine,等待一段时间d之后,将执行f。

示例:

package main

import (
   "fmt"
   "time"
)

func main() {

   duration := time.Duration(1) * time.Second

   f := func() {
      fmt.Println("f has been called after 1s by time.AfterFunc")
   }

   timer := time.AfterFunc(duration, f)

   defer timer.Stop()

   time.Sleep(2 * time.Second)
}

运行结果:

f has been called after 1s by time.AfterFunc

1s之后打印语句


time.After

func After(d Duration) <-chan Time {
    return NewTimer(d).C
}

根据函数定义可以看到,After函数经过时间d之后会返回timer里的管道,并且这个管道会在经过时间d之后写入数据,调用这个函数,就相当于实现了定时器。

一般time.After会配合select一起使用,使用示例如下:

package main

import (
   "fmt"
   "time"
)

func main() {
   ch := make(chan string)

   go func() {
      time.Sleep(time.Second * 2)
      ch <- "test"
   }()

   select {
   case val := <-ch:
      fmt.Printf("val is %s\n", val)
   case <-time.After(time.Second * 1):
      fmt.Println("timeout!!!")
   }
}

运行结果:

timeout!!!

程序创建了一个管道ch,并且在主goroutine用select监听两个管道,一个是刚刚创建的ch,一个是time.After函数返回的管道c。
ch管道2s之后才会有数据写入;而time.After函数是1s超时,所以1s后就会有数据写入。
因此select会先收到管道c里的数据,执行timeout退出。


time.Ticker

Ticker 是一个周期性触发定时的计时器,它会按照一个时间间隔往 channel 发送系统当前时间,而 channel 的接收者可以以固定的时间间隔从 channel 中读取事件,且只有关闭 Ticker 对象才不会继续发送时间消息。

type Ticker struct {
   C <-chan Time  // The channel on which the ticks are delivered.
   r runtimeTimer
}

示例1:

func main() {
    // 创建定时器,每隔1秒后,定时器就会给channel发送一个事件(当前时间)
    ticker := time.NewTicker(time.Second * 1)

    i := 0
    go func() {
        for { // 循环
            <-ticker.C
            i++  // 每隔1s,i递增1
            fmt.Println("i = ", i)

            if i == 5 {
                ticker.Stop() //停止定时器
            }
        }
    }()

    // 死循环,特地不让 main goroutine 结束
    for {
    }

}

运行结果:

i =  1
i =  2
i =  3
i =  4
i =  5