
go语言以其内置的并发原语——goroutine和channel——而闻名,它们使得编写并发程序变得简单而直观。在复杂的并发场景中,一个goroutine可能需要处理来自多个其他goroutine的数据输入。本文将详细阐述在go中实现这一目标的不同策略和最佳实践。
Go语言采用CSP(Communicating Sequential Processes)模型,提倡通过通信来共享内存,而不是通过共享内存来通信。Channel是实现这一模型的关键工具,它提供了一个类型安全的管道,允许Goroutine之间安全地发送和接收数据。
一个基本的通道通信示例如下:
package main
import "fmt"
import "time"
func sender(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i // 发送数据到通道
time.Sleep(100 * time.Millisecond)
}
close(ch) // 关闭通道,通知接收方不再有数据
}
func receiver(ch chan int) {
for val := range ch { // 从通道接收数据,直到通道关闭
fmt.Printf("Received: %d\n", val)
}
fmt.Println("Channel closed, receiver done.")
}
func main() {
dataChan := make(chan int)
go sender(dataChan)
receiver(dataChan)
}当一个Goroutine需要从多个不同的通道接收数据时,Go提供了多种灵活的方式来处理。
最直接的方式是依次从每个通道接收数据。这种方法适用于需要确保所有特定来源的数据都已被处理,并且对接收顺序有明确要求的情况。
立即学习“go语言免费学习笔记(深入)”;
package main
import (
"fmt"
"time"
)
// Routine1 从两个不同的通道接收数据
func Routine1(command12 chan int, command13 chan int) {
fmt.Println("Routine1 started.")
// 顺序接收来自 command12 的数据
cmd1 := <-command12
fmt.Printf("Routine1 received %d from command12\n", cmd1)
// 顺序接收来自 command13 的数据
cmd2 := <-command13
fmt.Printf("Routine1 received %d from command13\n", cmd2)
// 在这里处理接收到的 cmd1 和 cmd2
fmt.Printf("Routine1 processed pair: (%d, %d)\n", cmd1, cmd2)
}
// Routine2 向 command12 发送数据
func Routine2(command12 chan int) {
time.Sleep(100 * time.Millisecond) // 模拟一些工作
command12 <- 100 // 发送数据
fmt.Println("Routine2 sent 100 to command12.")
}
// Routine3 向 command13 发送数据
func Routine3(command13 chan int) {
time.Sleep(200 * time.Millisecond) // 模拟一些工作
command13 <- 200 // 发送数据
fmt.Println("Routine3 sent 200 to command13.")
}
func main() {
command12 := make(chan int)
command13 := make(chan int)
go Routine2(command12)
go Routine3(command13)
Routine1(command12, command13) // 主Goroutine作为Routine1
fmt.Println("Main finished.")
}注意事项:
当需要从多个通道中接收数据,但并不关心具体是哪个通道先就绪,或者需要处理非阻塞接收、超时等情况时,select语句是理想的选择。select会监听其所有case语句中的通道操作,一旦其中一个就绪,就会执行对应的代码块。如果多个通道同时就绪,select会公平地随机选择一个执行。
package main
import (
"fmt"
"time"
)
func Routine1WithSelect(command12 chan int, command13 chan int) {
fmt.Println("Routine1WithSelect started.")
for i := 0; i < 5; i++ { // 循环接收5次
select {
case cmd1 := <-command12:
fmt.Printf("Routine1WithSelect received %d from command12\n", cmd1)
// 处理来自 command12 的命令
case cmd2 := <-command13:
fmt.Printf("Routine1WithSelect received %d from command13\n", cmd2)
// 处理来自 command13 的命令
case <-time.After(500 * time.Millisecond): // 添加超时机制
fmt.Println("Routine1WithSelect timed out waiting for commands.")
return // 超时后退出循环
}
}
fmt.Println("Routine1WithSelect finished processing.")
}
func Routine2Sender(command12 chan int) {
for i := 1; i <= 3; i++ {
time.Sleep(150 * time.Millisecond)
command12 <- i * 10
fmt.Printf("Routine2Sender sent %d\n", i*10)
}
close(command12)
}
func Routine3Sender(command13 chan int) {
for i := 1; i <= 2; i++ {
time.Sleep(250 * time.Millisecond)
command13 <- i * 100
fmt.Printf("Routine3Sender sent %d\n", i*100)
}
close(command13)
}
func main() {
command12 := make(chan int)
command13 := make(chan int)
go Routine2Sender(command12)
go Routine3Sender(command13)
Routine1WithSelect(command12, command13)
fmt.Println("Main finished.")
// 等待所有Goroutine完成,防止主Goroutine过早退出
time.Sleep(1 * time.Second)
}select语句的关键特性:
除了上述基础的接收方式,Go还提供了一些高级模式来优化并发通信。
Go通道天生支持多个Goroutine向同一个通道发送数据,而一个Goroutine从该通道接收。这是一种非常常见的模式,可以简化通道管理,将来自不同源的数据汇聚到一个集中处理的入口。
package main
import (
"fmt"
"time"
)
// Processor 负责从一个统一的通道接收所有命令
func Processor(commandChan chan int) {
fmt.Println("Processor started.")
for cmd := range commandChan {
fmt.Printf("Processor received: %d\n", cmd)
// 处理接收到的命令
time.Sleep(50 * time.Millisecond) // 模拟处理时间
}
fmt.Println("Processor finished.")
}
// WorkerA 向统一通道发送数据
func WorkerA(commandChan chan int) {
for i := 0; i < 3; i++ {
commandChan <- i + 100
fmt.Printf("WorkerA sent %d\n", i+100)
time.Sleep(100 * time.Millisecond)
}
}
// WorkerB 向统一通道发送数据
func WorkerB(commandChan chan int) {
for i := 0; i < 3; i++ {
commandChan <- i + 200
fmt.Printf("WorkerB sent %d\n", i+200)
time.Sleep(120 * time.Millisecond)
}
}
func main() {
unifiedCommandChan := make(chan int) // 创建一个统一的命令通道
go Processor(unifiedCommandChan)
go WorkerA(unifiedCommandChan)
go WorkerB(unifiedCommandChan)
// 等待一段时间,确保所有Goroutine有机会发送数据
time.Sleep(1 * time.Second)
close(unifiedCommandChan) // 关闭通道,通知Processor退出循环
time.Sleep(100 * time.Millisecond) // 等待Processor退出
fmt.Println("Main finished.")
}优点:
在某些场景下,发送方不仅需要发送数据,还需要接收来自处理方的响应。Go语言中一种优雅的实现方式是在发送的消息结构体中包含一个“回复通道”(reply channel)。这允许发送方创建临时的、私有的通道来接收响应,实现请求-响应模式。
package main
import (
"fmt"
"time"
)
// Command 定义了包含命令内容和回复通道的消息结构
type Command struct {
Cmd string
Reply chan int // 用于接收回复的通道
}
// Requestor 发送请求并等待回复
func Requestor(commandChan chan Command, id int) {
// 为本次请求创建一个临时的回复通道
replyChan := make(chan int)
request := Command{
Cmd: fmt.Sprintf("doSomething_from_Requestor%d", id),
Reply: replyChan,
}
fmt.Printf("Requestor%d sending command: %s\n", id, request.Cmd)
commandChan <- request // 发送请求
// 等待并接收回复
status := <-replyChan
fmt.Printf("Requestor%d received status: %d\n", id, status)
close(replyChan) // 关闭回复通道
}
// Handler 接收请求,处理后通过回复通道发送响应
func Handler(commandChan chan Command) {
fmt.Println("Handler started.")
for req := range commandChan {
fmt.Printf("Handler received command: %s\n", req.Cmd)
// 模拟处理过程
time.Sleep(50 * time.Millisecond)
// 通过请求中携带的回复通道发送状态码
req.Reply <- 200 // SUCCESS (status code)
}
fmt.Println("Handler finished.")
}
func main() {
mainCommandChan := make(chan Command) // 主命令通道
go Handler(mainCommandChan)
// 启动多个请求者Goroutine
go Requestor(mainCommandChan, 1)
go Requestor(mainCommandChan, 2)
// 等待所有Goroutine完成
time.Sleep(1 * time.Second)
close(mainCommandChan) // 关闭主命令通道,通知Handler退出
time.Sleep(100 * time.Millisecond)
fmt.Println("Main finished.")
}优点:
Go语言的并发模型强大而灵活,通过合理利用Goroutine和Channel,我们可以构建出高效、可维护的并发程序。无论是简单的顺序接收,还是复杂的select多路复用,亦或是通过消息传递回复通道的请求-响应模式,Go都提供了直观且强大的工具来应对各种并发通信挑战。理解并熟练运用这些模式,是编写高质量Go并发代码的关键。选择哪种模式取决于具体的业务需求和性能考量,但核心思想始终是通过明确的通信路径来协调并发操作。
以上就是Go语言并发编程:灵活处理多源通道数据与通信模式的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号