众所周知 Go 最大的特点是协程相对于其他语言的线程来说是很轻量的,在 Linux 上线程的栈大小默认固定为 8MB,且不能扩展,也不能缩小。而普通的 goroutine 栈大小默认是 2KB (Go 1.19 改成了根据历史使用自动调整的方式),并且在需要的时候可以扩展到 1GB (不同的架构最大的大小会不同)。

我们在项目中日常开发会经常碰到需要并发使用协程处理业务逻辑的时候,如果是比较少量的处理并发任务,所耗费的系统资源也不会很多。不过一般来说我们大部分情况都无法判断这种并发任务具体需要执行的数量是多少,而大量频繁的起协程对于调度和垃圾回收的耗时还是会有影响,并且也极度耗费系统资源。因此,协程并不是越多越好,有的时候,我们会创建一个协程编排逻辑来专门处理这种场景,最理想的方式就是使用 Go 内置的 chan 去控制任务的执行,并充分利用到空闲的协程去并发完成任务。

目前市面上有很多协程处理第三方库都很优秀,不过这次我们试着自己实现一个简易的协程编排结构,也可以称之为协程池。下面就是一个通过内置 chan 和 wg 来控制并发任务处理的示例:

import (
    "fmt"
    "runtime/debug"
    "sync"
)

type Pool[T any] struct {
    taskQueue    chan T    // 任务队列
    taskFn       func(T)   // 任务执行函数
    workers      int       // 任务数量
    panicHandler func(any) // panic处理函数
    wg           sync.WaitGroup
}

// NewPool 创建一个新的协程池
func NewPool[T any](workers, capacity int, taskFn func(T), panicHandler func(any)) *Pool[T] {
    pool := &Pool[T]{
        taskQueue:    make(chan T, capacity),
        panicHandler: panicHandler,
        workers:      workers,
    }
    pool.taskFn = func(t T) {
        defer func() {
            // 处理协程运行中出现panic的情况
            if r := recover(); r != nil {
                if pool.panicHandler != nil {
                    pool.panicHandler(r)
                }
            }
        }()

        taskFn(t)
    }
    pool.wg.Add(workers)

    return pool
}

// Start 启动任务
func (p *Pool[T]) Start() {
    for i := 0; i < p.workers; i++ {
        go func() {
            defer p.wg.Done()

            for {
                task, ok := <-p.taskQueue
                if !ok {
                    return
                }

                p.taskFn(task)
            }
        }()
    }
}

// Push 提交任务
func (p *Pool[T]) Push(task T) {
    p.taskQueue <- task
}

// Wait 挂起当前协程
func (p *Pool[T]) Wait() {
    close(p.taskQueue)
    p.wg.Wait()
}

这里的NewPool工厂函数,其实参数定义可以使用 options 函数式选项模式来方便调用,详情可参考: 编程范式之函数式编程的应用

接下来我们写个测试用例来试试看结果是否如我们所预期:

import (
    "runtime"
    "runtime/debug"
    "sync/atomic"
    "testing"

    "github.com/brildum/testify/assert"
)

func TestNewPool(t *testing.T) {
    num := runtime.NumCPU()
    var sum atomic.Int32

    task := func(num int32) {
        if num < 0 {
            panic("unable to handle negative numbers")
        }

        sum.Add(num)
    }
    handler := func(r any) {
        fmt.Printf("Panic: %v\n %s", r, string(debug.Stack()))
    }
    pool := NewPool(num, num, task, handler)
    pool.Start()

    for i := int32(1000); i >= -1; i-- {
        pool.Push(i)
    }

    pool.Wait()

    if assert.Equal(t, int32(500500), sum.Load()) {
        t.Log("the sum value is right")
    }
}

运行结果输出了 "Panic: unable to handle negative numbers"、堆栈信息及 "the sum value is right" ,已达到效果。

更完善的 goroutine pool 逻辑讲解,请参考: GMP 并发调度器深度解析之手撸一个高性能 goroutine pool

本文协程编排逻辑部分摘抄于晁岳攀(@鸟窝)大佬的《 深入理解Go并发编程 》之第 8 章 - 池 Pool,并实现了panic处理。