
本文探讨在go语言中如何高效地实现独立工作协程的并行执行与同步。通过分析一个常见场景,即主协程需要等待多个独立工作协程完成对同一数据项的处理后才能继续,文章详细介绍了使用go通道(channel)进行输入分发和输出同步的正确模式,并提供了代码示例和最佳实践,确保在固定协程数量下实现真正的并发处理。
在Go语言中,利用其强大的并发原语——Goroutine和Channel,可以优雅地构建复杂的并发系统。然而,正确地编排这些并发任务以实现真正的并行并确保数据同步,是Go并发编程中的一个核心挑战。本文将深入探讨一种常见的并发场景:一个主协程需要将数据分发给多个独立的子工作协程进行处理,并且必须等待所有子工作协程完成处理后才能继续其自身流程。
假设我们有一个account协程,它从account_chan接收数据项。对于每个接收到的数据项,account协程需要委托给两个独立的子工作协程workerA和workerB进行处理。这两个worker协程的处理顺序不重要,但account协程必须确保workerA和workerB都已完成对当前数据项的处理,才能将该数据项发送到final_chan并继续处理下一个数据项。
此场景有以下关键要求:
考虑以下一种“直观”但错误的实现方式:
立即学习“go语言免费学习笔记(深入)”;
package main
import "fmt"
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("A ", d)
work_out_chan <- d // 模拟工作完成并发送信号
}
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("B ", d)
work_out_chan <- d // 模拟工作完成并发送信号
}
}
func account(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
for d := range account_chan {
// 错误的实现方式:顺序执行
wa_in <- d // 发送数据给workerA
<-wa_out // 等待workerA完成
wb_in <- d // 发送数据给workerB
<-wb_out // 等待workerB完成
final_chan <- d
}
}
func main() {
account_chan := make(chan int, 100)
final_chan := make(chan int, 100)
go account(account_chan, final_chan)
account_chan <- 1
account_chan <- 2
account_chan <- 3
close(account_chan) // 关闭输入通道,以便account协程最终退出
for i := 0; i < 3; i++ {
fmt.Println("Final:", <-final_chan)
}
}上述代码中的account协程在处理每个数据项时,首先将数据发送给workerA并立即等待其完成,然后才将数据发送给workerB并等待其完成。这种模式导致workerA和workerB实际上是顺序执行的,完全失去了并行处理的优势。这与我们希望它们并发执行的初衷相悖。
要实现workerA和workerB的真正并发,关键在于改变数据发送和完成信号接收的顺序。正确的做法是:首先将数据并行地发送给所有需要处理的子工作协程,然后并行地等待所有子工作协程的完成信号。
修改后的account协程中的循环逻辑如下:
启科网络商城系统由启科网络技术开发团队完全自主开发,使用国内最流行高效的PHP程序语言,并用小巧的MySql作为数据库服务器,并且使用Smarty引擎来分离网站程序与前端设计代码,让建立的网站可以自由制作个性化的页面。 系统使用标签作为数据调用格式,网站前台开发人员只要简单学习系统标签功能和使用方法,将标签设置在制作的HTML模板中进行对网站数据、内容、信息等的调用,即可建设出美观、个性的网站。
0
// ... (workerA, workerB, channel声明部分同上)
func account(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
for d := range account_chan {
// 正确的实现方式:并行发送输入,并行等待输出
wa_in <- d // 发送数据给workerA
wb_in <- d // 发送数据给workerB (此时workerA和workerB可同时开始处理)
<-wa_out // 等待workerA完成
<-wb_out // 等待workerB完成 (这两个接收操作会阻塞,直到两个worker都发送了信号)
final_chan <- d
}
// 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道
// 这样worker协程也能优雅退出
close(wa_in)
close(wb_in)
// 等待worker协程退出,或者确保它们处理完所有数据
// 实际应用中可能需要更复杂的协调机制,例如WaitGroup
close(wa_out) // 如果worker协程已退出,这些通道可能需要关闭
close(wb_out)
}代码解释:
这种模式确保了workerA和workerB能够真正地并发执行。当一个数据项被发送给它们时,它们会同时开始处理。account协程则会在两个worker都发出完成信号后,才继续处理下一个数据项。
初次接触这种模式时,可能会担心“如果workerB比workerA先完成怎么办?”。实际上,这并不重要。<-wa_out和<-wb_out是两个独立的阻塞操作。无论哪个worker先完成并发送信号,account协程都会继续阻塞在另一个尚未完成的接收操作上,直到两个信号都收到。这种机制天然地实现了“全部完成”的同步需求,而无需关心具体的完成顺序。
在上述示例中,worker协程的work_out_chan实际上只用于发送一个完成信号,其发送的具体值在account协程中并未被使用。在这种情况下,sync.WaitGroup是一个更简洁且推荐的替代方案,特别是当工作协程不需要返回任何处理结果,仅需通知完成时。
使用 sync.WaitGroup 的示例:
package main
import (
"fmt"
"sync"
"time" // 引入time包用于模拟耗时操作
)
func workerA_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
defer wg.Done() // 确保无论如何都调用Done
for d := range work_in_chan {
fmt.Println("A ", d)
time.Sleep(100 * time.Millisecond) // 模拟耗时
// workerA完成一个任务后,并不立即调用Done,而是在协程退出时调用一次
// 如果是每个任务完成后都要通知,则需要每次循环内调用Done,并增加Add计数
}
fmt.Println("WorkerA exited.")
}
func workerB_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
defer wg.Done() // 确保无论如何都调用Done
for d := range work_in_chan {
fmt.Println("B ", d)
time.Sleep(150 * time.Millisecond) // 模拟耗时
}
fmt.Println("WorkerB exited.")
}
func account_wg(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wb_in := make(chan int)
// 注意:WaitGroup通常用于等待一组goroutine的完成。
// 在本例中,worker协程是常驻的,每个数据项的处理需要单独同步。
// 因此,WaitGroup的Add/Done操作需要针对每个数据项进行。
go workerA_wg(wa_in, nil) // 这里的wg传入nil,因为workerA_wg的wg参数用于其自身退出,而非每次任务完成
go workerB_wg(wb_in, nil) // 同上
for d := range account_chan {
var wg sync.WaitGroup
wg.Add(2) // 为workerA和workerB各增加一个计数
// 改进的worker函数,每次处理完一个数据项后调用wg.Done()
go func(data int) {
defer wg.Done()
wa_in <- data
// 在实际worker中处理,这里只是发送数据
// 假设workerA接收到数据后会自己处理并发送一个信号
// 但如果workerA是常驻的,它的Done应该由它自己控制
}(d)
go func(data int) {
defer wg.Done()
wb_in <- data
}(d)
// 这种模式下,如果workerA/B是常驻的,且每次处理一个数据后需要通知,
// 那么workerA/B内部需要接收一个wg指针并在处理完数据后调用Done。
// 这会使workerA/B的签名变得复杂,需要传递WaitGroup指针。
// 更直接的WaitGroup使用方式,如果worker是短暂的:
// 如果worker是常驻的,且每个数据项处理完后需要通知,
// 那么原始的out_chan模式更清晰。
// 如果要用WaitGroup,需要重构worker函数使其接收WaitGroup指针,并在处理完数据后调用Done。
// 例如:
// go func(data int, wg *sync.WaitGroup) {
// defer wg.Done()
// // 模拟workerA处理
// fmt.Println("A processing", data)
// time.Sleep(100 * time.Millisecond)
// }(d, &wg)
// go func(data int, wg *sync.WaitGroup) {
// defer wg.Done()
// // 模拟workerB处理
// fmt.Println("B processing", data)
// time.Sleep(150 * time.Millisecond)
// }(d, &wg)
// 如果worker是常驻的,并且每次处理一个数据后需要通知,
// 那么每个worker需要一个输入通道和一个输出通道(或直接使用WaitGroup)。
// 原始的channel方案在这里更直观。
// 如果坚持使用WaitGroup,则每个worker需要一个输入通道,
// 并且在处理完一个数据后,主协程(或一个协调协程)负责调用wg.Done()。
// 这意味着worker的输出通道仍然是必要的,或者worker自己调用Done。
// 鉴于原问题中workerA和workerB是单例协程,且每次处理一个数据后需要通知主协程,
// 原始的输入/输出通道对模式是更直接且符合其设计意图的。
// WaitGroup通常用于等待一组goroutine的启动和最终退出,
// 而不是用于每次任务的同步。
// 如果要用于每次任务同步,那么每个任务需要一个WaitGroup,这会比Channel复杂。
// 因此,对于原问题描述,使用独立输出通道的模式是更合适的。
// WaitGroup更适合于等待一组一次性任务的完成,或者等待常驻goroutine的最终退出。
// 在这里,每个数据项的处理都是一个“任务”,需要等待两个worker完成,
// 每次迭代都需要独立的同步。
// 重新考虑:如果worker的out channel仅仅是信号,
// 那么可以在account协程内部为每个数据项创建一个临时的WaitGroup。
// workerA和workerB需要被改造,使其接收WaitGroup指针并在处理完成后调用Done。
// 鉴于原始问题中的约束和代码结构,使用独立输出通道是最直接和符合Go惯用法的方式。
// 让我们回到原始的Channel解决方案,因为它更贴合“固定数量Goroutine”和“每次任务同步”的需求。
}
close(wa_in)
close(wb_in)
}
// 总结:对于“固定数量常驻worker协程,每次处理一个数据项后需要同步”的场景,
// 使用输入通道分发数据,输出通道接收完成信号,是最直接和符合Go语言习惯的模式。
// WaitGroup更适用于等待一组Goroutine的整体完成,而非每次任务的细粒度同步。在Go语言中,实现多个独立工作协程的并行执行和同步,关键在于合理地利用通道进行数据传输和信号协调。当主协程需要等待所有子工作协程完成对同一数据项的处理时,正确的模式是:
这种模式不仅确保了真正的并发,而且利用Go通道的阻塞特性,自然地实现了“全部完成”的同步语义,而无需手动管理复杂的锁或条件变量。通过这种方式,我们可以构建出高效、健壮且易于理解的并发系统。
以上就是Go语言中并行独立工作协程的同步模式的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号