开发者-导航 猿导航

Go 语言中,Fan-out/Fan-in 和 Pipeline 是两种强大的并发模式(AI)

发布时间:

已建立使用 goroutine 和通道构建并发程序的设计方法。关键模式:fan-in(合并输入)、fan-out(分配工作)、pipeline(流水线)、工作池、发布-订阅通信。帮助构建高效、可扩展的应用程序,同时避免竞争条件和死锁。

1. Pipeline(流水线)模式 #

将复杂任务分解为一系列有序的阶段,每个阶段处理数据后传递给下一个阶段,类似工厂流水线。每个阶段通过 channel 连接,前一阶段的输出作为后一阶段的输入。

2. Fan-out/Fan-in 模式 #

以下是结合这两种模式的示例,实现对一系列数字的并行处理:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 生成数字序列(流水线第一阶段)
func generateNumbers(start, count int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 0; i < count; i++ {
			out <- start + i
			time.Sleep(100 * time.Millisecond) // 模拟生成耗时
		}
	}()
	return out
}

// 平方计算(可并行处理的阶段 - Fan-out)
func square(num int) int {
	time.Sleep(200 * time.Millisecond) // 模拟计算耗时
	return num * num
}

// 并行处理数字(Fan-out)
func squareWorker(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- square(n)
		}
	}()
	return out
}

// 过滤偶数(流水线最后阶段)
func filterEvens(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			if n%2 == 0 {
				out <- n
			}
			time.Sleep(50 * time.Millisecond)
		}
	}()
	return out
}

// Fan-in:合并多个channel的输出
func merge(cs ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup

	wg.Add(len(cs))
	for _, c := range cs {
		go func(ch <-chan int) {
			defer wg.Done()
			for n := range ch {
				out <- n
			}
		}(c)
	}

	// 等待所有输入channel关闭后再关闭输出channel
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

func main() {
	rand.Seed(time.Now().UnixNano())
	
	// 1. 生成10个数字(1-10)
	numbers := generateNumbers(1, 10)
	
	// 2. Fan-out:启动3个worker并行处理平方计算
	worker1 := squareWorker(numbers)
	worker2 := squareWorker(numbers)
	worker3 := squareWorker(numbers)
	
	// 3. Fan-in:合并3个worker的结果
	merged := merge(worker1, worker2, worker3)
	
	// 4. 过滤出偶数结果(流水线最后阶段)
	evens := filterEvens(merged)
	
	// 收集并打印结果
	for n := range evens {
		fmt.Printf("最终结果: %d\n", n)
	}
}

代码说明: #

  1. Pipeline 阶段

    • generateNumbers:生成初始数据序列
    • squareWorker:处理数据(计算平方)
    • filterEvens:处理前一阶段结果(过滤偶数)
  2. Fan-out

    • 启动 3 个 squareWorker 并行处理 generateNumbers 产生的数据
    • 每个 worker 从同一个输入 channel 读取数据,实现任务分发
  3. Fan-in

    • merge 函数将 3 个 worker 的输出汇总到一个 channel
    • 使用 sync.WaitGroup 等待所有 worker 完成后关闭输出 channel

这种模式的优势:

适用于数据处理流水线、ETL 任务、分布式计算等场景。

并发&并行 #

在 Go 语言的并发模型中,首先需要明确并行(Parallelism)并发(Concurrency) 的核心区别:

简单说:并发是"看起来同时",并行是"真正同时";并发是逻辑设计,并行是物理实现。