Golang实现简易的顺序执行协程池

发布时间 2023-12-12 14:36:00作者: 雨山木风

countable_executor.go

// 一个可计数的单线程顺序任务执行器
type CountableExecutor struct {
    name       string              // 名称
    taskQueue  chan iCountableTask // 任务队列
    bufferSize int                 // 缓冲区大小
}

// 一个可计数的单线程任务执行器
type ICountableExecutor interface {
    GetName() string
    Execute(func())
    GetQueueLength() int
    DelayedExecute(delayedTime time.Duration, runnable func())
    beforeOfferTask(iCountableTask)
    beforeTaskStart(iCountableTask)
    beforeTaskEnd(iCountableTask)
}

func NewCountableExecutor(name string, bufferSize int) ICountableExecutor {
    executor := &CountableExecutor{
        name:       name,
        taskQueue:  make(chan iCountableTask, bufferSize),
        bufferSize: bufferSize,
    }
    go executor.startTaskWheel()
    return executor
}

func (executor *CountableExecutor) startTaskWheel() {
    for task := range executor.taskQueue {
        executor.beforeTaskStart(task)
        task.run()
    }
}

func (executor *CountableExecutor) Execute(runnable func()) {
    task := newCountableTask(executor, runnable)
    executor.beforeOfferTask(task)
    executor.taskQueue <- task
}

/*
@Param delayedTime     任务延时执行的时间
*/
func (executor *CountableExecutor) DelayedExecute(delayedTime time.Duration, runnable func()) {
    time.AfterFunc(delayedTime, func() {
        executor.Execute(runnable)
    })
}

func (executor *CountableExecutor) beforeOfferTask(task iCountableTask) {
    queueLength := len(executor.taskQueue)
    if queueLength > 20 {
        logger.Warnf("入队检查【%s】:队列长度过长,请检查服务器状态!length:%d\n", executor.name, queueLength)
    }
}

func (executor *CountableExecutor) beforeTaskStart(task iCountableTask) {
    task.setStartTime(NowTimestampMs())
    if task.getWaitMs() > 300 {
        logger.Warnf("执行任务前检查【%s】:任务等待过久,请检查服务器状态!waitMs:%d\n", executor.name, task.getWaitMs())
    }
}

func (executor *CountableExecutor) beforeTaskEnd(task iCountableTask) {
    if task.getExeMs() > 200 {
        logger.Warnf("执行任务后检查【%s】:任务执行时间过久,请检查服务器状态!exeMs:%d\n", executor.name, task.getExeMs())
    }
}

func (executor *CountableExecutor) GetQueueLength() int {
    return len(executor.taskQueue)
}

func (executor *CountableExecutor) GetName() string {
    return executor.name
}

// =============== 任务 ===============
type countableTask struct {
    executor     ICountableExecutor
    offerTime    int64 // 加入队列的时间
    exeStartTime int64 // 开始执行任务的时间
    exeEndTime   int64 // 执行任务结束时间
    runnable     func()
}
type iCountableTask interface {
    setStartTime(int64)
    run()             // 执行任务
    getWaitMs() int64 // 等待执行耗时
    getExeMs() int64  // 实际执行耗时
}

func newCountableTask(executor ICountableExecutor, runnable func()) iCountableTask {
    return &countableTask{
        executor:  executor,
        runnable:  runnable,
        offerTime: NowTimestampMs(),
    }
}

// 执行任务
func (task *countableTask) run() {
    defer task.recoverErr()
    defer task.executor.beforeTaskEnd(task)
    task.runnable()
    task.exeEndTime = NowTimestampMs()
}

func (task *countableTask) recoverErr() {
    if e := recover(); e != nil {
        var stackBuf [1024]byte
        stackBufLen := runtime.Stack(stackBuf[:], false)
        logger.Error("协程池【", task.executor.GetName(), "】执行任务发生异常:", e)
        logger.Errorf("==> %s\n", string(stackBuf[:stackBufLen]))

    }
}

// 设置任务开始执行的时间
func (task *countableTask) setStartTime(exeStartTime int64) {
    task.exeStartTime = exeStartTime
}

// 获取执行任务的耗时
func (task *countableTask) getWaitMs() int64 {
    return task.exeStartTime - task.offerTime
}

// 获取执行任务的耗时
func (task *countableTask) getExeMs() int64 {
    return task.exeEndTime - task.exeStartTime
}

 

hash.go

import (
    "fmt"
    "hash/crc32"
)

// 使用crc32作为一致性hash算法
func CalcHash(key interface{}) uint32 {
    crc32q := crc32.MakeTable(crc32.Castagnoli)
    return crc32.Checksum([]byte(fmt.Sprint(key)), crc32q)
}

// 使用crc32作为一致性hash算法,返回值不包含bucketNum,范围 0~bucketNum-1
func CalcHashRange(key interface{}, bucketNum int) int {
    hash := CalcHash(key)
    return int(hash % uint32(bucketNum))
}

 

countable_executors.go 



import (
    "fmt"
)

//
任务执行器池,获取时通过hash散列 type CountableExecutors struct { name string // 名称 executors []ICountableExecutor // 执行器 executorNum int // 执行器数量 } // 一个可计数的单线程任务执行器 type ICountableExecutors interface { GetExecutor(interface{}) ICountableExecutor Execute(interface{}, func()) } func NewCountableExecutors(name string, executorNum int, bufferSize int) ICountableExecutors { countableExecutors := &CountableExecutors{ name: name, executors: make([]ICountableExecutor, executorNum), executorNum: executorNum, } for i := 1; i <= executorNum; i++ { countableExecutors.executors[i-1] = NewCountableExecutor(fmt.Sprint(name, i), bufferSize) } return countableExecutors } func (countableExecutors *CountableExecutors) GetExecutor(res interface{}) ICountableExecutor { executorIndex := CalcHashRange(res, countableExecutors.executorNum) return countableExecutors.executors[executorIndex] } func (countableExecutors *CountableExecutors) Execute(res interface{}, runnable func()) { executorIndex := CalcHashRange(res, countableExecutors.executorNum) countableExecutors.executors[executorIndex].Execute(runnable) }

 

使用示例:

func demo(){
    corePool := myutil.NewCountableExecutors("core-routine-", config.GlobalConfig.Executor.CorePoolSize, 1024)
    userId := 123
    corePool.Execute(userId, func(){
        println("user 123 do something...")
    })
}