
在使用`tqdm.contrib.concurrent.process_map`进行并行处理时,直接将大型数组作为函数参数传递可能因数据复制导致`memoryerror`。本教程将介绍如何利用`multiprocessing.array`创建共享内存,使多个进程能够高效访问同一份大型数组数据,避免昂贵的内存复制,从而优化内存使用并实现健壮的并行计算。
tqdm.contrib.concurrent.process_map提供了一种便捷的方式来并行化函数调用,并带有进度条显示,这对于处理耗时任务非常有用。然而,当并行函数需要访问一个大型数组(例如一个NumPy数组)时,直接传递这个数组作为函数参数可能会遇到内存效率问题。
考虑以下场景:有一个大型数组B,以及一个需要对B的特定部分进行操作的函数test(a, B)。如果尝试通过构造一个包含B的元组列表(例如agrid = [(0, B), (1, B), ...])来传递参数给process_map,multiprocessing模块在将这些参数发送给子进程时,会尝试序列化并复制B。对于非常大的B,这种复制会导致显著的内存开销,甚至可能引发MemoryError,因为每个子进程都会获得B的一个独立副本。
问题的核心在于,我们希望所有子进程都能访问同一个大型数组B,而不是各自拥有一个副本。
为了解决大型数组的内存复制问题,Python的multiprocessing模块提供了Array类,它允许在进程间共享原始数据类型的数组。通过将大型数组存储在共享内存中,子进程可以直接访问这块内存,而无需进行昂贵的数据复制。
其基本原理如下:
下面是一个详细的示例,展示了如何使用multiprocessing.Array与tqdm.contrib.concurrent.process_map结合,高效地处理大型NumPy数组。
import ctypes
from multiprocessing import Array
from time import sleep
import numpy as np
from tqdm.contrib.concurrent import process_map
# 定义一个全局变量来持有共享数组的引用
# 注意:在多进程环境中,全局变量的赋值需要在if __name__ == "__main__": 块内
# 并且子进程会继承父进程的全局变量副本,但对于multiprocessing.Array,
# 它们会指向同一块共享内存区域。
B_shared = None
# 数组的维度
N = 1_000
def test(a):
"""
工作函数:操作共享内存中的大型数组。
这个函数只接收一个索引 'a' 作为参数。
"""
# 将共享内存对象转换为NumPy数组视图
# B_shared 必须在进程启动时被正确初始化
arr = np.frombuffer(B_shared.get_obj()).reshape((N, N))
# 模拟复杂的计算
sleep(0.1) # 减少睡眠时间以加快示例运行
# 如果需要写入共享数组,必须使用锁来同步访问
# 例如:
# with B_shared.get_lock():
# arr[a, a] = some_new_value
# 返回数组中特定位置的值
return arr[a, a]
if __name__ == "__main__":
# 1. 初始化共享内存数组
# ctypes.c_double 指定数组元素类型为双精度浮点数
# N * N 是数组的总元素数量
B_shared = Array(ctypes.c_double, N * N)
# 2. 将共享内存转换为NumPy数组视图,并填充数据
# arr 是一个NumPy数组,但它的数据存储在B_shared管理的共享内存中
arr_view = np.frombuffer(B_shared.get_obj()).reshape((N, N))
arr_view[:] = np.random.uniform(size=(N, N)) # 填充随机数据
print(f"原始数组B_shared的前几个元素:\n{arr_view[:2, :2]}")
# 3. 定义要迭代的参数列表
agrid = [0, 1, 2, 3] # 假设我们想对这些索引进行操作
# 4. 使用 process_map 进行并行计算
# test 函数将会在每个子进程中执行
# max_workers 控制并行进程数
# chunksize 控制每次发送给子进程的任务块大小
parallel_results_tqdm = process_map(
test,
agrid,
max_workers=2,
chunksize=1,
)
print(f"\n并行计算结果: {parallel_results_tqdm}")
print(f"计算完成后B_shared的前几个元素 (如果未写入,则与原始相同):\n{arr_view[:2, :2]}")
代码解析与注意事项:
通过利用multiprocessing.Array创建共享内存,我们成功解决了在tqdm.contrib.concurrent.process_map中传递大型数组参数时可能遇到的MemoryError问题。这种方法避免了昂贵的数据复制,显著提高了内存效率,并使得并行处理大型数据集成为可能。
关键要点包括:
掌握这种技术,将能更高效地利用多核CPU资源,处理计算密集型且涉及大型数据集的Python任务。
以上就是在tqdm process_map中高效传递大型数组参数:共享内存解决方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号