在之前的文章使用atomic包实现无锁并发统计中,我们了解到原子操作在保证并发安全的同时,能避免锁带来的性能损耗,其应用场景广泛,如并发计数器、自旋锁等。这次,我们将深入探讨原子操作的另一个经典应用场景 —— lock-free queue(无锁队列),并基于此实现一个延时无锁队列。
队列基础
队列是一种遵循先进先出(FIFO,First In First Out)原则的数据结构,最先进入队列的元素最先被移除,元素顺序得以保持。队列结构包含队首和队尾,支持入队和出队操作。常见的队列实现方式有数组和链表。数组实现的队列在入队和出队时可能需要移动元素,而链表实现的队列则因只需更新指针,操作更为高效。
无锁队列实现
无锁队列利用原子操作,在保证并发安全的前提下,允许多个线程同时进行入队和出队操作,无需传统锁机制,显著提升并发性能。我们采用链表结构来进一步优化效率,其定义如下:
import (
"sync/atomic"
"unsafe"
)
type LkQueue[T any] struct {
head unsafe.Pointer
tail unsafe.Pointer
}
type node[T any] struct {
value T
next unsafe.Pointer
}
基础操作实现
首先,我们完善队列的创建、读取节点及修改节点的操作:
func NewLkQueue[T any]() *LkQueue[T] {
n := unsafe.Pointer(&node[T]{})
return &LkQueue[T]{head: n, tail: n}
}
// load读取节点的值
func load[T any](p *unsafe.Pointer) *node[T] {
return (*node[T])(atomic.LoadPointer(p))
}
// cas原子地修改节点的值
func cas[T any](p *unsafe.Pointer, old, new *node[T]) bool {
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}
这里的读取节点(load)和修改节点(cas)操作使用了原子读及 CAS 操作,确保在并发访问时数据安全。
入队操作
入队操作流程如下:先接收值并创建新节点,然后循环尝试将新节点插入队列尾部。每次循环中,通过 load 函数读取当前队列的尾部节点 tail 和其下一个节点 next,检查 tail 是否仍为队列尾部(确保状态未变)。若 tail 是队列尾部,再检查 next 是否为 nil。若 next 为 nil,尝试将新节点 n 插入 tail 的 next 位置;若插入成功,进一步更新队列的 tail 指针指向新节点 n(均通过原子操作 CAS 完成);若插入失败,说明队列状态被其他线程修改,继续循环尝试插入。若 next 不为 nil,说明 tail 不是队列最后一个节点,尝试将 tail 指针移动到 next 节点,以便下次循环继续尝试插入。
// Enqueue入队
func (q *LkQueue[T]) Enqueue(value T) {
n := &node[T]{value: value}
for {
tail := load[T](&q.tail)
next := load[T](&tail.next)
if tail == load[T](&q.tail) { // tail和next是否一致
if next == nil {
if cas(&tail.next, next, n) {
cas(&q.tail, tail, n) // 入队完成。设置tail
return
}
} else {
cas(&q.tail, tail, next)
}
}
}
}
出队操作
出队操作同样在操作前进行检查。代码循环尝试从队列头部移除节点。每次循环中,通过 load 函数读取当前队列的头部节点 head、尾部节点 tail 和头部节点的下一个节点 next,检查 head 是否仍为队列头部(确保状态未变)。若 head 仍为队列头部,进一步检查 head 和 tail 是否相等。若 head 和 tail 相等,队列可能为空或 tail 未到队尾,此时检查 next 是否为 nil ;若 next 为 nil,说明队列为空,返回 false 表示出队失败;若 next 不为 nil,说明 tail 未到队尾,尝试将 tail 指针移动到 next 节点,以便下次循环继续出队操作。若 head 和 tail 不相等,说明队列中有元素可出队,将 next 节点的值赋给 value,并尝试通过原子操作 CAS 将 head 指针移动到 next 节点;若操作成功,返回 true 表示出队成功。
// Dequeue出队
func (q *LkQueue[T]) Dequeue() (value T, ok bool) {
for {
head := load[T](&q.head)
tail := load[T](&q.tail)
next := load[T](&head.next)
if head == load[T](&q.head) { // 检查head、tail和next是否一致
if head == tail { // 队列为空,或者tail还未到队尾
if next == nil { // 为空
return value, false
}
cas(&q.tail, tail, next) // 将tail往队尾移动
} else {
value = next.value
if cas(&q.head, head, next) {
return value, true // 出队完成
}
}
}
}
}
通过链表指针、原子读及 CAS 操作,我们成功实现了高性能的 lock-free queue。
延时无锁队列实现
基于上述无锁队列,我们来实现延时无锁队列。
结构与创建
首先是结构定义和创建方法,通过直接定义匿名结构体包含队列的入队及出队操作:
import (
"iter"
"time"
)
type DelayLkQueue[T any] struct {
LkQueue[T]
}
func NewDelayLkQueue[T any]() *DelayLkQueue[T] {
return &DelayLkQueue[T]{*NewLkQueue[T]()}
}
延迟入队操作
延迟入队操作利用Go语言原生的延迟执行功能实现:
// DelayEnqueue延迟入队
func (q *DelayLkQueue[T]) DelayEnqueue(value T, duration time.Duration) {
time.AfterFunc(duration, func() {
q.Enqueue(value)
})
}
持续监听出队操作
Go 1.23 的迭代器特性非常适合持续监听出队的场景:
// ContinuousDequeue持续监听出队
func (q *DelayLkQueue[T]) ContinuousDequeue() iter.Seq[T] {
return func(yield func(T) bool) {
for {
if value, ok := q.Dequeue(); ok {
if !yield(value) {
return
}
} else {
time.Sleep(time.Millisecond) // 队列为空,休眠1毫秒
}
}
}
}
// ContinuousDequeueExecute持续监听出队执行函数
func (q *DelayLkQueue[T]) ContinuousDequeueExecute(fn func(T)) {
for value := range q.ContinuousDequeue() {
fn(value)
}
}
测试验证
为验证延时无锁队列的功能,我们编写如下测试用例:
import (
"testing"
"time"
)
func TestDelayLkQueue(t *testing.T) {
cases := []struct {
value int
duration time.Duration
}{
{1, time.Second},
{3, time.Second * 3},
}
q := NewDelayLkQueue[int]()
for _, c := range cases {
q.DelayEnqueue(c.value, c.duration)
}
go q.ContinuousDequeueExecute(func(i int) {
t.Log(i)
t.Log(time.Now().Unix())
})
time.Sleep(time.Second * 5)
}
执行测试用例后,我们可以看到延迟无锁队列已能够达到预期效果:
=== RUN TestDelayLkQueue
delay_queue_test.go:23: 1
delay_queue_test.go:24: 1733211617
delay_queue_test.go:23: 3
delay_queue_test.go:24: 1733211619
--- PASS: TestDelayLkQueue (5.00s)
PASS
本文所有代码示例可通过点击这里查看。希望通过这篇文章,大家能对无锁队列和延时无锁队列有更深入的理解和应用能力。