最近工作之余翻了一遍 sync 官方包中的 Once.Do 源码,获得了一些收获。众所周知 Go 中 Once.Do 是用来实现整个生命周期只执行一次,也就是单例模式的这么一个方法,并且它是并发安全的,有时候就会好奇它的底层实现原理;其实官方包中实现 Once.Do 的源码只有那么几行,但是里面蕴藏的细节还是值得深究,首先我们来看源码:

// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
//
// In the terminology of the Go memory model,
// the return from f “synchronizes before”
// the return from any call of once.Do(f).
type Once struct {
    // done indicates whether the action has been performed.
    // It is first in the struct because it is used in the hot path.
    // The hot path is inlined at every call site.
    // Placing done first allows more compact instructions on some architectures (amd64/386),
    // and fewer instructions (to calculate offset) on other architectures.
    done uint32
    m    Mutex
}

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
//
//    var once Once
//
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
//
//    config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
func (o *Once) Do(f func()) {
    // Note: Here is an incorrect implementation of Do:
    //
    //    if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
    //        f()
    //    }
    //
    // Do guarantees that when it returns, f has finished.
    // This implementation would not implement that guarantee:
    // given two simultaneous calls, the winner of the cas would
    // call f, and the second would return immediately, without
    // waiting for the first's call to f to complete.
    // This is why the slow path falls back to a mutex, and why
    // the atomic.StoreUint32 must be delayed until after f returns.

    if atomic.LoadUint32(&o.done) == 0 {
        // Outlined slow-path to allow inlining of the fast-path.
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

首先它的结构体中包含了一个标记状态用来表示是否已执行,还有一个互斥锁用来保证并发安全。通过拜读注释我们知道,这其中的关键就在于使用 atomic 包来进行原子操作保证有效性,同时将状态比对跟执行后的标记进行分开处理。

这里给出了一个错误的实现方式:

if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
    f()
}

在晁岳攀(鸟窝)的博客文章中我们知道这样写是无法保证并发执行的有效性,必须再通过互斥锁来调取初始化函数 f 后将状态标记为已执行,这里其实还涉及到快路径的概念。

通过查找资料得知,atomic 包是利用了 CPU 的原子操作特性来实现并发安全,那它有哪些使用方式呢?

抛开类型方面的异同,其中一共有五个关键方法(指针操作只有四个),分别是:

func AddT(addr *T, delta T)(new T)
func LoadT(addr *T) (val T)
func StoreT(addr *T, val T)
func SwapT(addr *T, new T) (old T)
func CompareAndSwapT(addr *T, old, new T) (swapped bool)

同时 1.19+ 版本还增加了类型方法的便捷使用方式,分别可以用来原子操作实现值递增/递减、读取、设置、置换、比较并交换。我们这里可以使用 Add 及 Load 来处理无锁并发统计的功能,拿之前 层级递归算法的优化 这篇文章举个栗子;得到层级列表数据后,如果想批量获取最后一级的完整层级链路,我们可以先用递归处理:

func GetDepartmentDisplayString(id int, list []Departments, name string) string {
    for _, item := range list {
        nameStr := ""

        if name != "" {
            nameStr = name + "-" + item.Name
        } else {
            nameStr = item.Name
        }

        if item.Id == id {
            return nameStr
        }

        value := GetDepartmentDisplayString(id, item.Children, nameStr)
        if value != "" {
            return value
        }
    }

    return ""
}

接下来就可使用 chan + select 来通过协程执行批量组装:

type departmentResult struct {
    Id   int
    Name string
}

func GetDepartmentMap(ctx context.Context, ids []int, list []Departments) map[int]string {
    result := make(map[int]string, len(ids))
    resultChan := make(chan departmentResult, len(ids))
    // 使用协程池避免资源被耗尽
    goroutineChan := make(chan struct{}, 10)
    defer close(goroutineChan)
    defer close(resultChan)

    // 使用原子操作计数
    var num atomic.Uint32
    for _, id := range ids {
        goroutineChan <- struct{}{}
        go func(id int) {
            defer func() {
                num.Add(1)
                <-goroutineChan
            }()
            name := GetDepartmentDisplayString(id, list, "")
            resultChan <- departmentResult{
                Id:   id,
                Name: name,
            }
        }(id)
    }

    for {
        select {
        case res := <-resultChan:
            result[res.Id] = res.Name
            if num.Load() == uint32(len(ids)) {
                return result
            }
        case <-time.After(time.Second * 10):
            logrus.WithContext(ctx).Error("获取部门层级信息执行超时")
            return result
        }
    }
}

这里使用有缓冲的 chan 来处理 ids 过大而导致的资源耗尽的问题,然后用 atomic 包进行原子操作并发统计执行次数避免 ids 需要提前去重的问题。其实后面这个 select 可以换成 for range resultChan 同时配合 close(resultChan) 来收集处理结果,就像之前文章使用的那样,只不过用 select 可以设置一个过期时间防止等待过长,在这里还是比较适合用这种方式。

参考文献:
Go sync.Once的三重门
sync/atomic 标准库包中提供的原子操作