Golang中通过sync.Mutex和sync.Cond结合container/list实现并发队列,确保多Goroutine下安全存取。

Golang中实现并发队列,核心在于确保多个Goroutine在同时存取数据时,不会发生竞态条件导致数据损坏或逻辑错误。我们通常会借助Go语言自带的并发原语,如
sync.Mutex
container/list
chan
一个基于
sync.Mutex
container/list
package main
import (
"container/list"
"fmt"
"sync"
"time"
)
// ConcurrentQueue 定义了一个并发安全的队列
type ConcurrentQueue struct {
queue *list.List // 实际存储数据的链表
mutex sync.Mutex // 保护队列的互斥锁
cond *sync.Cond // 条件变量,用于等待和通知
cap int // 队列容量,0表示无限制
}
// NewConcurrentQueue 创建一个新的并发队列
// capacity为0表示无限制容量,否则为固定容量
func NewConcurrentQueue(capacity int) *ConcurrentQueue {
q := &ConcurrentQueue{
queue: list.New(),
cap: capacity,
}
q.cond = sync.NewCond(&q.mutex) // 条件变量需要一个Locker
return q
}
// Enqueue 将元素加入队列
func (q *ConcurrentQueue) Enqueue(item interface{}) error {
q.mutex.Lock()
defer q.mutex.Unlock()
// 如果有容量限制,且队列已满,则等待
for q.cap > 0 && q.queue.Len() >= q.cap {
q.cond.Wait() // 释放锁并等待,被唤醒后重新获取锁
}
q.queue.PushBack(item)
q.cond.Signal() // 通知一个等待的消费者
return nil
}
// Dequeue 从队列中取出元素
func (q *ConcurrentQueue) Dequeue() (interface{}, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
// 如果队列为空,则等待
for q.queue.Len() == 0 {
q.cond.Wait() // 释放锁并等待,被唤醒后重新获取锁
}
element := q.queue.Front()
q.queue.Remove(element)
q.cond.Signal() // 通知一个等待的生产者(如果队列曾满而等待)
return element.Value, nil
}
// TryEnqueue 尝试将元素加入队列,如果队列满则立即返回错误
func (q *ConcurrentQueue) TryEnqueue(item interface{}) error {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.cap > 0 && q.queue.Len() >= q.cap {
return fmt.Errorf("queue is full")
}
q.queue.PushBack(item)
q.cond.Signal()
return nil
}
// TryDequeue 尝试从队列中取出元素,如果队列空则立即返回错误
func (q *ConcurrentQueue) TryDequeue() (interface{}, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.queue.Len() == 0 {
return nil, fmt.Errorf("queue is empty")
}
element := q.queue.Front()
q.queue.Remove(element)
q.cond.Signal()
return element.Value, nil
}
// Len 返回队列当前长度
func (q *ConcurrentQueue) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.queue.Len()
}
// IsEmpty 判断队列是否为空
func (q *ConcurrentQueue) IsEmpty() bool {
return q.Len() == 0
}
func main() {
queue := NewConcurrentQueue(5) // 创建一个容量为5的并发队列
var wg sync.WaitGroup
// 生产者
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
item := fmt.Sprintf("数据-%d", id)
if err := queue.Enqueue(item); err != nil {
fmt.Printf("生产者%d: 尝试入队 %s 失败: %v\n", id, item, err)
} else {
fmt.Printf("生产者%d: 入队 %s, 当前队列长度: %d\n", id, item, queue.Len())
}
}(i)
}
// 消费者
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 稍微延迟一下,让生产者有机会先生产一些数据
time.Sleep(time.Millisecond * 50)
item, err := queue.Dequeue()
if err != nil {
fmt.Printf("消费者%d: 尝试出队失败: %v\n", id, err)
} else {
fmt.Printf("消费者%d: 出队 %v, 当前队列长度: %d\n",以上就是Golang并发队列实现与操作示例的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号