在以往的博文中我们知道 Go 可以通过 chan 可以实现很多种方式的并发编排,包括 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce 等,并且已经基于扇出模式实现了设计模式之观察者模式,这次我们就来探讨一下扇入模式。
扇入模式的定义跟扇出模式是反着来的,即通过多个输入 chan 将结果合并到一个输出 chan 中并返回。举个实际的栗子,假设我们有两个上游,需要将上游数据合并成一个统一的下游进行输出,代码定义如下:
func mergeTwo[T any](ch1, ch2 <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for ch1 != nil || ch2 != nil { // 至少有一个 chan 不是 nil 则继续
select {
case v, status := <-ch1:
if !status {
ch1 = nil // ch1 关闭时,将其赋值为 nil
break
}
out <- v
case v, status := <-ch2:
if !status {
ch2 = nil // ch2 关闭时,将其赋值为 nil
break
}
out <- v
}
}
}()
return out
}
这里有一个 select 动态监听的小技巧,也就是将监听的 chan 设置为 nil 后,就不会再走入到该分支中了,通过使用这种方式可以做到 select 动态去除某个 chan 的监听动作。
如果是存在多个上游的情况,那么如何实现扇入模式的合并逻辑?一种方式是通过递归来实现,思路就是将输入 chan 进行两两合并:
func mergeRecursion[T any](chs ...<-chan T) <-chan T {
switch len(chs) {
case 0: // 输入 chan 的数量为0,返回一个已关闭的 chan
out := make(chan T)
close(out)
return out
case 1: // 输入 chan 的数量为1,直接返回
return chs[0]
case 2: // 输入 chan 的数量为2,合并这两个 chan
return mergeTwo(chs[0], chs[1])
default:
m := len(chs) / 2
return mergeTwo(
mergeRecursion(chs[:m]...), // 递归调用前半部分
mergeRecursion(chs[m:]...)) // 递归调用后半部分
}
}
这里使用前面的 mergeTwo 方法来处理两个输入 chan 的合并,最终可将所有的输入 chan 进行合并输出。
另一种方式则可以用到 reflect.SelectCase 来处理输入 chan 的接收,把结果直接丢入输出 chan 中即可。
import "reflect"
func fanIn[T any](chs ...<-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
var cases []reflect.SelectCase
for _, ch := range chs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
for len(cases) > 0 {
i, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
if !ok {
cases = append(cases[:i], cases[i+1:]...) // 将已关闭的输入 chan 从 select 列表中移除
continue
}
out <- value.Interface().(T) // 这里需要从 interface 中转换成具体的类型
}
}()
return out
}
通过动态去除 reflect.SelectCase 切片中的元素也能解决 chan 动态监听的场景,同时能够优雅处理多个输入上游的监听逻辑。
本文代码部分引用《 100个Go语言典型错误 》之 #66:没有使用 nil channel,及晁岳攀(@鸟窝)大佬的《 深入理解Go并发编程 》之第 12 章 - channel 的内部实现和陷阱。