[转]Golang线程池实现百万级高并发

发布时间 2023-11-19 23:20:45作者: 立志做一个好的程序员

 

转,原文: https://lk668.github.io/2021/03/22/2021-03-22-Golang%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E7%99%BE%E4%B8%87%E7%BA%A7%E9%AB%98%E5%B9%B6%E5%8F%91/

--------------

Golang线程池实现百万级高并发

本文基于Golang实现线程池,从而可以达到百万级别的高并发请求。本文实现的代码在https://github.com/lk668/threadpool可见。

1. Golang并发简介

Golang原生的goroutine可以很轻松实现并发编程。Go语言的并发是基于用户态的并发,这种并发方式就变得非常轻量,能够轻松运行几万并发逻辑。Go 的并发属于 CSP 并发模型的一种实现,CSP 并发模型的核心概念是:不要通过共享内存来通信,而应该通过通信来共享内存。

2. 并发方案演进

2.1 直接使用goroutine

直接使用goroutine启动一个新的线程来进行任务的执行

1
go handler(request)

2.2 缓冲队列

利用channel实现一个缓冲队列,每次请求先放入缓冲队列,然后从缓冲队列读取数据,一次执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Job interface{
Run()
}

// 长度为1000的缓冲队列
jobQueue := make(chan Job, 1000)

// 启动一个协程来读取缓冲队列的数据
go func(){
for {
select {
case job := <- jobQueue:
job.Run()
}
}
}()

// 请求发送
job := Job{}
jobQueue <- job

该方案在请求量低于缓冲队列长度时,可以应对并发请求。但是当并发请求量大于缓冲队列长度时,channel会出现阻塞情况。

2.3 线程池实现百万级高并发

更好的实现方案是利用job队列+线程池来实现,具体如下所示:

有个全局JobQueue,用来存储需要执行的Job,有个WorkerPool的线程池,用来存储空闲的Worker。当JobQueue中有Job时,从JobQueue获取Job对象,从WorkerPool获取空闲Worker,将Job对象发送给Worker,进行执行。每个Worker都是一个独立的Goroutine。从而真正意义上实现高并发。

代码实现主要分为三部分

2.3.1 Job定义

Job是一个interface,其下有个函数RunTask,用户定制化的任务,需要实现RunTask函数。JobChan是一个Job channel结构。Job是高并发需要执行的任务

1
2
3
4
5
type Job interface {
RunTask(request interface{})
}

type JobChan chan Job

2.3.2 Worker定义

Worker就是高并发里面的一个线程,启动的时候是一个Goroutine。Worker结构一需要一个JobChan,用来接收从全局JobQueue里面获取的Job对象。有个Quit的channel,用来接收退出信号。需要一个Start函数,将自己注册到WorkerPool,然后监听Job,有Job传入时,处理Job的RunTask,处理完成之后,重新将自己添加回WorkerPool。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Worker结构体
// WorkerPool随机选取一个Worker,将Job发送给Worker去执行
type Worker struct {
// 不需要带缓冲的任务队列
JobQueue JobChan
//退出标志
Quit chan bool
}

// 创建一个新的Worker对象
func NewWorker() Worker {
return Worker{
make(JobChan),
make(chan bool),
}
}

// 启动一个Worker,来监听Job事件
// 执行完任务,需要将自己重新发送到WorkerPool
func (w Worker) Start(workerPool *WorkerPool) {
// 需要启动一个新的协程,从而不会阻塞
go func() {
for {
// 将worker注册到线程池
workerPool.WorkerQueue <- &w
select {
case job := <-w.JobQueue:
job.RunTask(nil)
// 终止当前worker
case <-w.Quit:
return
}
}
}()
}

2.3.3 WorkerPool定义

WorkerPool是一个线程池,用来存储Worker。所以需要一个Size变量,用来表示这个Pool存储的Worker的个数。需要一个JobQueue,用来充当全局JobQueue的作用,所有的job先存储到该全局JobQueue中。有个WorkerQueue是个channel,用来存储空闲的Worker,有Job来的时候,从WorkerQueue里面取出一个Worker,执行相应的任务,执行完成以后,重新将Worker放回WorkerQueue。

WorkerPool需要一个启动函数,一个是来启动Size数量的Worker线程,一个是需要启动一个新的线程,来接收Job。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 线程池
type WorkerPool struct {
// 线程池大小
Size int
// 不带缓冲的任务队列,任务到达后,从workerQueue随机选取一个Worker来执行Job
JobQueue JobChan
WorkerQueue chan *Worker
}

func NewWorkerPool(poolSize, jobQueueLen int) *WorkerPool {
return &WorkerPool{
poolSize,
make(JobChan, jobQueueLen),
make(chan *Worker, poolSize),
}
}

func (wp *WorkerPool) Start() {

// 将所有worker启动
for i := 0; i < wp.Size; i++ {
worker := NewWorker()
worker.Start(wp)
}

// 监听JobQueue,如果接收到请求,随机取一个Worker,然后将Job发送给该Worker的JobQueue
// 需要启动一个新的协程,来保证不阻塞
go func() {
for {
select {
case job := <-wp.JobQueue:
worker := <-wp.WorkerQueue
worker.JobQueue <- job
}
}
}()

}

2.3.4 代码调用举例

接下来,举例分析如何使用该线程池。首先需要定义你要执行的任务,实现RunTask函数。然后初始化一个WorkerPool,将模拟百万请求的数据发送给全局JobQueue。交给线程池进行任务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//需要执行任务的结构体
type Task struct {
Number int
}

// 实现Job这个interface的RunTask函数
func (t Task) RunTask(request interface{}) {
fmt.Println("This is task: ", t.Number)
//设置个等待时间
time.Sleep(1 * time.Second)
}

func main() {

// 设置线程池的大小
poolNum := 100 * 100 * 20
jobQueueNum := 100
workerPool := threadpool.NewWorkerPool(poolNum, jobQueueNum)
workerPool.Start()

// 模拟百万请求
dataNum := 100 * 100 * 100

go func() {
for i := 0; i < dataNum; i++ {
task := Task{Number: i}
workerPool.JobQueue <- task
}
}()

// 阻塞主线程
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}

参考