我们知道 Go 语言中的 channel (以下简称 chan ) 是该语言并发编程模型的一个核心部分。通过 chan , Go 允许在不同的 Goroutine 之间安全地传递数据。并且通过 chan 可以实现很多种方式的并发编排,包括 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce 等,今天我们就来看看扇出模式的典型用例 —— 观察者模式。
在鸟窝大佬的新书 《 深入理解Go并发编程 》 之第 12 章 - channel 的内部实现和陷阱 中,给出了扇出模式的具体实现:
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() { //退出时关闭所有的输出chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := 0; i < len(out); i++ {
i := i
if async { //异步
go func() {
out[i] <- v // 放入到输出chan中,异步方式
}()
} else {
out[i] <- v // 放入到输出chan中,同步方式
}
}
}
}()
}
这里其实还可以使用反射的方式来实现,具体就是通过 reflect.SelectCase 来处理输入管道的接收,再通过一个循环向输出管道分发数据:
func fanOutReflect(ch <-chan interface{}, out []chan interface{}, async bool) {
var cases []reflect.SelectCase
// 添加输入 chan 的 reflect.SelectCase
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
go func() {
defer func() {
// 退出时关闭所有的输出 chan
for _, o := range out {
close(o)
}
}()
for {
_, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
if !ok {
// 输入 channel 被关闭
return
}
// 输入 channel 接收到数据
for _, o := range out {
if async { // 异步
go func(o chan interface{}, value reflect.Value) {
o <- value.Interface() // 放入到输出 chan 中,异步方式
}(o, value)
} else {
o <- value.Interface() // 放入到输出 chan 中,同步方式
}
}
}
}()
}
扇出模式最典型的一个使用场景就是观察者模式,下面我们就来手撸一个基于 chan 扇出模式实现的观察者模式。
import "reflect"
// Subject 被观察者
type Subject struct {
observers []Observer
in chan any
}
// Observer 观察者 chan
type Observer chan any
// NewSubject 获取被观察者实例
func NewSubject() Subject {
return Subject{in: make(chan any)}
}
// Attach 观察者绑定
func (s *Subject) Attach(observer ...Observer) {
s.observers = append(s.observers, observer...)
}
// Detach 观察者解绑
func (s *Subject) Detach(observer Observer) {
for i, obs := range s.observers {
if obs == observer {
s.observers = append(s.observers[:i], s.observers[i+1:]...)
close(observer)
break
}
}
}
// Notify 通知观察者
func (s *Subject) Notify(data any) {
go s.fanOut(s.in, s.observers)
s.in <- data
}
// fanOut 扇出模式实现
func (s *Subject) fanOut(ch <-chan interface{}, out []Observer) {
var cases []reflect.SelectCase
// 添加输入 chan 的 reflect.SelectCase
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
go func() {
defer func() {
// 退出时关闭所有的输出 chan
for _, o := range out {
close(o)
}
}()
for {
_, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
if !ok {
// 输入 channel 被关闭
return
}
// 输入 channel 接收到数据
for _, o := range out {
o <- value.Interface() // 放入到输出 chan 中,同步方式
}
}
}()
}
最后我们试着来模拟一个用户注册的场景,注册完毕后需要发送邮件及欢迎通知,此时就可以使用观察者模式来实现:
import "fmt"
func main() {
sendEmail := make(Observer)
notifyWelcome := make(Observer)
userRegister := NewSubject()
userRegister.Attach(sendEmail, notifyWelcome)
go func() {
for data := range sendEmail {
fmt.Println("The send email service receive data: ", data)
}
}()
go func() {
for data := range notifyWelcome {
fmt.Println("The notify welcome service receive data: ", data)
}
}()
newUser1 := "fantasticbin"
newUser2 := "gan"
userRegister.Notify(newUser1)
userRegister.Notify(newUser2)
time.Sleep(1 * time.Second)
}
运行后输出:
The notify welcome service receive data: fantasticbin
The send email service receive data: fantasticbin
The send email service receive data: gan
The notify welcome service receive data: gan
结果达到预期。
点赞哈哈哈哈,我还以为用反射去给out输出
out直接循环分发就好,用反射反而复杂了