发布时间:
已建立使用 goroutine 和通道构建并发程序的设计方法。关键模式:fan-in
(合并输入)、fan-out
(分配工作)、pipeline
(流水线)、工作池、发布-订阅通信。帮助构建高效、可扩展的应用程序,同时避免竞争条件和死锁。
1. Pipeline(流水线)模式 #
将复杂任务分解为一系列有序的阶段,每个阶段处理数据后传递给下一个阶段,类似工厂流水线。每个阶段通过 channel 连接,前一阶段的输出作为后一阶段的输入。
2. Fan-out/Fan-in 模式 #
- Fan-out:将一个任务分发到多个并行的 goroutine 中处理(扩展并发)
- Fan-in:将多个并行 goroutine 的结果汇总到一个输出流中
以下是结合这两种模式的示例,实现对一系列数字的并行处理:
package main
import (
"fmt"
"math/rand"
"time"
)
// 生成数字序列(流水线第一阶段)
func generateNumbers(start, count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < count; i++ {
out <- start + i
time.Sleep(100 * time.Millisecond) // 模拟生成耗时
}
}()
return out
}
// 平方计算(可并行处理的阶段 - Fan-out)
func square(num int) int {
time.Sleep(200 * time.Millisecond) // 模拟计算耗时
return num * num
}
// 并行处理数字(Fan-out)
func squareWorker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- square(n)
}
}()
return out
}
// 过滤偶数(流水线最后阶段)
func filterEvens(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
time.Sleep(50 * time.Millisecond)
}
}()
return out
}
// Fan-in:合并多个channel的输出
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(c)
}
// 等待所有输入channel关闭后再关闭输出channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
rand.Seed(time.Now().UnixNano())
// 1. 生成10个数字(1-10)
numbers := generateNumbers(1, 10)
// 2. Fan-out:启动3个worker并行处理平方计算
worker1 := squareWorker(numbers)
worker2 := squareWorker(numbers)
worker3 := squareWorker(numbers)
// 3. Fan-in:合并3个worker的结果
merged := merge(worker1, worker2, worker3)
// 4. 过滤出偶数结果(流水线最后阶段)
evens := filterEvens(merged)
// 收集并打印结果
for n := range evens {
fmt.Printf("最终结果: %d\n", n)
}
}
代码说明: #
-
Pipeline 阶段:
generateNumbers
:生成初始数据序列squareWorker
:处理数据(计算平方)filterEvens
:处理前一阶段结果(过滤偶数)
-
Fan-out:
- 启动 3 个
squareWorker
并行处理generateNumbers
产生的数据 - 每个 worker 从同一个输入 channel 读取数据,实现任务分发
- 启动 3 个
-
Fan-in:
merge
函数将 3 个 worker 的输出汇总到一个 channel- 使用
sync.WaitGroup
等待所有 worker 完成后关闭输出 channel
这种模式的优势:
- 并行效率:充分利用多核 CPU 处理能力,提高整体吞吐量
- 可扩展性:可根据任务负载动态调整 worker 数量
- 模块化:每个阶段独立,便于测试和维护
- 背压机制:channel 天然支持的阻塞特性可平衡各阶段速度差异
适用于数据处理流水线、ETL 任务、分布式计算等场景。
并发&并行 #
在 Go 语言的并发模型中,首先需要明确并行(Parallelism) 和并发(Concurrency) 的核心区别:
- 并发(Concurrency):指多个任务在逻辑上"同时"推进,这些任务可能交替使用同一 CPU 核心(通过时间分片),强调"任务调度的交错性"。
- 并行(Parallelism):指多个任务在物理上"同时"执行,需要多个 CPU 核心支持,强调"实际执行的同时性"。
简单说:并发是"看起来同时",并行是"真正同时";并发是逻辑设计,并行是物理实现。