最近工作之余翻了一遍 sync 官方包中的 Once.Do 源码,获得了一些收获。众所周知 Go 中 Once.Do 是用来实现整个生命周期只执行一次,也就是单例模式的这么一个方法,并且它是并发安全的,有时候就会好奇它的底层实现原理;其实官方包中实现 Once.Do 的源码只有那么几行,但是里面蕴藏的细节还是值得深究,首先我们来看源码:
// Once is an object that will perform exactly one action.
//
// A Once must not be copied after first use.
//
// In the terminology of the Go memory model,
// the return from f “synchronizes before”
// the return from any call of once.Do(f).
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/386),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
//
// var once Once
//
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
//
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
func (o *Once) Do(f func()) {
// Note: Here is an incorrect implementation of Do:
//
// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
// f()
// }
//
// Do guarantees that when it returns, f has finished.
// This implementation would not implement that guarantee:
// given two simultaneous calls, the winner of the cas would
// call f, and the second would return immediately, without
// waiting for the first's call to f to complete.
// This is why the slow path falls back to a mutex, and why
// the atomic.StoreUint32 must be delayed until after f returns.
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
首先它的结构体中包含了一个标记状态用来表示是否已执行,还有一个互斥锁用来保证并发安全。通过拜读注释我们知道,这其中的关键就在于使用 atomic 包来进行原子操作保证有效性,同时将状态比对跟执行后的标记进行分开处理。
这里给出了一个错误的实现方式:
if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
f()
}
在晁岳攀(鸟窝)的博客文章中我们知道这样写是无法保证并发执行的有效性,必须再通过互斥锁来调取初始化函数 f 后将状态标记为已执行,这里其实还涉及到快路径的概念。
通过查找资料得知,atomic 包是利用了 CPU 的原子操作特性来实现并发安全,那它有哪些使用方式呢?
抛开类型方面的异同,其中一共有五个关键方法(指针操作只有四个),分别是:
func AddT(addr *T, delta T)(new T)
func LoadT(addr *T) (val T)
func StoreT(addr *T, val T)
func SwapT(addr *T, new T) (old T)
func CompareAndSwapT(addr *T, old, new T) (swapped bool)
同时 1.19+ 版本还增加了类型方法的便捷使用方式,分别可以用来原子操作实现值递增/递减、读取、设置、置换、比较并交换。我们这里可以使用 Add 及 Load 来处理无锁并发统计的功能,拿之前 层级递归算法的优化 这篇文章举个栗子;得到层级列表数据后,如果想批量获取最后一级的完整层级链路,我们可以先用递归处理:
func GetDepartmentDisplayString(id int, list []Departments, name string) string {
for _, item := range list {
nameStr := ""
if name != "" {
nameStr = name + "-" + item.Name
} else {
nameStr = item.Name
}
if item.Id == id {
return nameStr
}
value := GetDepartmentDisplayString(id, item.Children, nameStr)
if value != "" {
return value
}
}
return ""
}
接下来就可使用 chan + select 来通过协程执行批量组装:
type departmentResult struct {
Id int
Name string
}
func GetDepartmentMap(ctx context.Context, ids []int, list []Departments) map[int]string {
result := make(map[int]string, len(ids))
resultChan := make(chan departmentResult, len(ids))
// 使用协程池避免资源被耗尽
goroutineChan := make(chan struct{}, 10)
defer close(goroutineChan)
defer close(resultChan)
// 使用原子操作计数
var num atomic.Uint32
for _, id := range ids {
goroutineChan <- struct{}{}
go func(id int) {
defer func() {
num.Add(1)
<-goroutineChan
}()
name := GetDepartmentDisplayString(id, list, "")
resultChan <- departmentResult{
Id: id,
Name: name,
}
}(id)
}
for {
select {
case res := <-resultChan:
result[res.Id] = res.Name
if num.Load() == uint32(len(ids)) {
return result
}
case <-time.After(time.Second * 10):
logrus.WithContext(ctx).Error("获取部门层级信息执行超时")
return result
}
}
}
这里使用有缓冲的 chan 来处理 ids 过大而导致的资源耗尽的问题,然后用 atomic 包进行原子操作并发统计执行次数避免 ids 需要提前去重的问题。其实后面这个 select 可以换成 for range resultChan 同时配合 close(resultChan) 来收集处理结果,就像之前文章使用的那样,只不过用 select 可以设置一个过期时间防止等待过长,在这里还是比较适合用这种方式。