funcmultiChannelSelect() { ch1 := make(chanint) ch2 := make(chanstring) gofunc() { time.Sleep(500 * time.Millisecond) ch1 <- 42 }() gofunc() { time.Sleep(300 * time.Millisecond) ch2 <- "hello" }() for i := 0; i < 2; i++ { select { case num := <-ch1: fmt.Printf("从ch1收到数字: %d\n", num) case str := <-ch2: fmt.Printf("从ch2收到字符串: %s\n", str) } } }
适用场景:
同时处理多个通道
实现优先级通道读取
需要响应多个事件源
带超时的通道遍历
functimeoutRange() { ch := make(chanint) gofunc() { time.Sleep(2 * time.Second) ch <- 100 }() for { select { case value := <-ch: fmt.Printf("收到值: %d\n", value) return case <-time.After(1 * time.Second): fmt.Println("等待超时,重试中...") // 这里可以添加重试逻辑 } } }
适用场景:
网络请求超时处理
防止阻塞的系统调用
需要超时重试的机制
上下文控制的通道遍历
funccontextControlledRange() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() ch := make(chanint) gofunc() { for i := 0; ; i++ { select { case <-ctx.Done(): return default: ch <- i time.Sleep(500 * time.Millisecond) } } }() for { select { case value := <-ch: fmt.Printf("收到: %d\n", value) case <-ctx.Done(): fmt.Println("上下文超时,停止接收") return } } }
适用场景:
需要全局取消操作的场景
微服务中的请求处理
长时间运行任务的优雅终止
通道的多种使用模式
单向通道约束
funcproducer(out chan<- int) { for i := 0; i < 5; i++ { out <- i } close(out) }
funcconsumer(in <-chanint) { for n := range in { fmt.Printf("消费: %d\n", n) } }
funcunidirectionalExample() { ch := make(chanint) go producer(ch) consumer(ch) }
优势:
增强类型安全
明确函数职责
防止误操作
通道扇入(Fan-In)
funcfanIn(inputs ...<-chanint) <-chanint { out := make(chanint) var wg sync.WaitGroup for _, in := range inputs { wg.Add(1) gofunc(ch <-chanint) { defer wg.Done() for n := range ch { out <- n } }(in) } gofunc() { wg.Wait() close(out) }() return out }
适用场景:
合并多个数据源
多生产者单消费者模型
日志聚合系统
通道扇出(Fan-Out)
funcfanOut(in <-chanint, workers int) []<-chanint { outputs := make([]<-chanint, workers) for i := 0; i < workers; i++ { ch := make(chanint) outputs[i] = ch gofunc(id int) { deferclose(ch) for n := range in { fmt.Printf("Worker %d 处理: %d\n", id, n) ch <- n * 2 } }(i) } return outputs }
适用场景:
并行处理任务
负载均衡
分布式计算
通道作为信号量
funcsemaphoreExample() { var wg sync.WaitGroup sem := make(chanstruct{}, 3) // 最大并发数3 tasks := []string{"A", "B", "C", "D", "E", "F"} for _, task := range tasks { wg.Add(1) gofunc(t string) { defer wg.Done() sem <- struct{}{} // 获取信号量 deferfunc() { <-sem }() // 释放信号量 fmt.Printf("开始任务 %s\n", t) time.Sleep(time.Second) fmt.Printf("完成任务 %s\n", t) }(task) } wg.Wait() }
// 解决方案: 使用发送标志 var sendAllowed = true if sendAllowed { ch <- data }
实际应用案例
实时数据管道
funcdataPipeline() { // 数据生成 rawData := make(chan sensorData, 100) go collectSensorData(rawData) // 数据处理管道 filtered := filterData(rawData) transformed := transformData(filtered) results := aggregateData(transformed) // 结果输出 for res := range results { saveToDatabase(res) updateDashboard(res) } }
funcfilterData(in <-chan sensorData) <-chansensorData { out := make(chan sensorData) gofunc() { deferclose(out) for data := range in { if data.valid() { out <- data } } }() return out }
Web服务器请求处理
funchandleRequests() { reqChan := make(chan *http.Request, 100) resChan := make(chan *http.Response, 100) // 启动工作池 for i := 0; i < 10; i++ { go worker(reqChan, resChan) } // 处理响应 gofunc() { for res := range resChan { sendResponse(res) } }() // HTTP请求处理 http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { reqChan <- r }) http.ListenAndServe(":8080", nil) }
funcworker(reqs <-chan *http.Request, res chan<- *http.Response) { for req := range reqs { // 处理请求 res <- processRequest(req) } }
并发爬虫系统
funcwebCrawler() { urls := make(chanstring, 1000) results := make(chan crawlResult, 100) visited := make(map[string]bool) var mutex sync.Mutex // 种子URL urls <- "https://example.com" // 启动爬虫worker for i := 0; i < 20; i++ { gofunc(id int) { for url := range urls { // 爬取页面 result := crawlPage(url) results <- result // 处理新链接 for _, link := range result.links { mutex.Lock() if !visited[link] { visited[link] = true urls <- link } mutex.Unlock() } } }(i) } // 处理结果 for i := 0; i < 100; { // 限制最多100个页面 result := <-results saveResult(result) i++ } close(urls) }