在以往的博文中我们知道 Go 可以通过 chan 可以实现很多种方式的并发编排,包括 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce 等,并且已经基于扇出模式实现了设计模式之观察者模式,这次我们就来探讨一下扇入模式。

扇入模式的定义跟扇出模式是反着来的,即通过多个输入 chan 将结果合并到一个输出 chan 中并返回。举个实际的栗子,假设我们有两个上游,需要将上游数据合并成一个统一的下游进行输出,代码定义如下:

func mergeTwo[T any](ch1, ch2 <-chan T) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)

        for ch1 != nil || ch2 != nil { // 至少有一个 chan 不是 nil 则继续
            select {
            case v, status := <-ch1:
                if !status {
                    ch1 = nil // ch1 关闭时,将其赋值为 nil
                    break
                }
                out <- v
            case v, status := <-ch2:
                if !status {
                    ch2 = nil  // ch2 关闭时,将其赋值为 nil
                    break
                }
                out <- v
            }
        }
    }()

    return out
}

这里有一个 select 动态监听的小技巧,也就是将监听的 chan 设置为 nil 后,就不会再走入到该分支中了,通过使用这种方式可以做到 select 动态去除某个 chan 的监听动作。

如果是存在多个上游的情况,那么如何实现扇入模式的合并逻辑?一种方式是通过递归来实现,思路就是将输入 chan 进行两两合并:

func mergeRecursion[T any](chs ...<-chan T) <-chan T {
    switch len(chs) {
    case 0: // 输入 chan 的数量为0,返回一个已关闭的 chan
        out := make(chan T)
        close(out)
        return out
    case 1: // 输入 chan 的数量为1,直接返回
        return chs[0]
    case 2: // 输入 chan 的数量为2,合并这两个 chan
        return mergeTwo(chs[0], chs[1])
    default:
        m := len(chs) / 2
        return mergeTwo(
            mergeRecursion(chs[:m]...), // 递归调用前半部分
            mergeRecursion(chs[m:]...)) // 递归调用后半部分
    }
}

这里使用前面的 mergeTwo 方法来处理两个输入 chan 的合并,最终可将所有的输入 chan 进行合并输出。

另一种方式则可以用到 reflect.SelectCase 来处理输入 chan 的接收,把结果直接丢入输出 chan 中即可。

import "reflect"

func fanIn[T any](chs ...<-chan T) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)
        
        var cases []reflect.SelectCase
        for _, ch := range chs {
            cases = append(cases, reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(ch),
            })
        }

        for len(cases) > 0 {
            i, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
            if !ok {
                cases = append(cases[:i], cases[i+1:]...) // 将已关闭的输入 chan 从 select 列表中移除
                continue
            }

            out <- value.Interface().(T) // 这里需要从 interface 中转换成具体的类型
        }
    }()
    
    return out
}

通过动态去除 reflect.SelectCase 切片中的元素也能解决 chan 动态监听的场景,同时能够优雅处理多个输入上游的监听逻辑。

本文代码部分引用《 100个Go语言典型错误 》之 #66:没有使用 nil channel,及晁岳攀(@鸟窝)大佬的《 深入理解Go并发编程 》之第 12 章 - channel 的内部实现和陷阱。