我们知道 Go 语言中的 channel (以下简称 chan ) 是该语言并发编程模型的一个核心部分。通过 chan , Go 允许在不同的 Goroutine 之间安全地传递数据。并且通过 chan 可以实现很多种方式的并发编排,包括 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce 等,今天我们就来看看扇出模式的典型用例 —— 观察者模式。

在鸟窝大佬的新书 《 深入理解Go并发编程 》 之第 12 章 - channel 的内部实现和陷阱 中,给出了扇出模式的具体实现:

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出时关闭所有的输出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()

        for v := range ch { // 从输入chan中读取数据
            v := v
            for i := 0; i < len(out); i++ {
                i := i
                if async { //异步
                    go func() {
                        out[i] <- v // 放入到输出chan中,异步方式
                    }()
                } else {
                    out[i] <- v // 放入到输出chan中,同步方式
                }
            }
        }
    }()
}

这里其实还可以使用反射的方式来实现,具体就是通过 reflect.SelectCase 来处理输入管道的接收,再通过一个循环向输出管道分发数据:

func fanOutReflect(ch <-chan interface{}, out []chan interface{}, async bool) {
    var cases []reflect.SelectCase
    // 添加输入 chan 的 reflect.SelectCase
    cases = append(cases, reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(ch),
    })

    go func() {
        defer func() {
            // 退出时关闭所有的输出 chan
            for _, o := range out {
                close(o)
            }
        }()

        for {
            _, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
            if !ok {
                // 输入 channel 被关闭
                return
            }

            // 输入 channel 接收到数据
            for _, o := range out {
                if async {  // 异步
                    go func(o chan interface{}, value reflect.Value) {
                        o <- value.Interface() // 放入到输出 chan 中,异步方式
                    }(o, value)
                } else {
                    o <- value.Interface() // 放入到输出 chan 中,同步方式
                }
            }
        }
    }()
}

扇出模式最典型的一个使用场景就是观察者模式,下面我们就来手撸一个基于 chan 扇出模式实现的观察者模式。

import "reflect"

// Subject 被观察者
type Subject struct {
    observers []Observer
    in        chan any
}

// Observer 观察者 chan
type Observer chan any

// NewSubject 获取被观察者实例
func NewSubject() Subject {
    return Subject{in: make(chan any)}
}

// Attach 观察者绑定
func (s *Subject) Attach(observer ...Observer) {
    s.observers = append(s.observers, observer...)
}

// Detach 观察者解绑
func (s *Subject) Detach(observer Observer) {
    for i, obs := range s.observers {
        if obs == observer {
            s.observers = append(s.observers[:i], s.observers[i+1:]...)
            close(observer)
            break
        }
    }
}

// Notify 通知观察者
func (s *Subject) Notify(data any) {
    go s.fanOut(s.in, s.observers)
    s.in <- data
}

// fanOut 扇出模式实现
func (s *Subject) fanOut(ch <-chan interface{}, out []Observer) {
    var cases []reflect.SelectCase
    // 添加输入 chan 的 reflect.SelectCase
    cases = append(cases, reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(ch),
    })

    go func() {
        defer func() {
            // 退出时关闭所有的输出 chan
            for _, o := range out {
                close(o)
            }
        }()

        for {
            _, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
            if !ok {
                // 输入 channel 被关闭
                return
            }

            // 输入 channel 接收到数据
            for _, o := range out {
                o <- value.Interface() // 放入到输出 chan 中,同步方式
            }
        }
    }()
}

最后我们试着来模拟一个用户注册的场景,注册完毕后需要发送邮件及欢迎通知,此时就可以使用观察者模式来实现:

import "fmt"

func main() {
    sendEmail := make(Observer)
    notifyWelcome := make(Observer)

    userRegister := NewSubject()
    userRegister.Attach(sendEmail, notifyWelcome)

    go func() {
        for data := range sendEmail {
            fmt.Println("The send email service receive data: ", data)
        }
    }()

    go func() {
        for data := range notifyWelcome {
            fmt.Println("The notify welcome service receive data: ", data)
        }
    }()

    newUser1 := "fantasticbin"
    newUser2 := "gan"
    userRegister.Notify(newUser1)
    userRegister.Notify(newUser2)
    time.Sleep(1 * time.Second)
}

运行后输出:

The notify welcome service receive data:  fantasticbin
The send email service receive data:  fantasticbin
The send email service receive data:  gan
The notify welcome service receive data:  gan

结果达到预期。