
本文深入探讨了在使用python `multiprocessing.pool`的`starmap`方法结合`syncmanager.dict`共享字典时,因`zip`函数误用导致任务不执行和共享数据为空的常见问题。通过分析`zip`处理空迭代器的行为,提供了正确的参数构造方法和共享字典键值设置,确保多进程任务能够成功执行并正确更新共享状态,从而实现高效的并行计算。
在Python中,multiprocessing模块允许程序利用多核处理器进行并行计算,显著提升处理速度。multiprocessing.Pool提供了一种便捷的方式来管理一组工作进程,并分发任务。当需要在这些并行进程之间共享数据时,multiprocessing.managers.SyncManager提供了一种机制,例如通过manager.dict()创建一个可以在不同进程间同步访问的字典。
Pool.starmap(func, iterable)方法是map的一个变体,它接受一个可迭代对象,其中每个元素本身也是一个可迭代对象(如元组),starmap会将这些子元素作为单独的参数解包(*操作符)传递给func。
初学者在使用starmap时,常遇到的一个问题是任务未能按预期执行,或者共享数据未被更新。这往往是由于传递给starmap的参数列表构造不当造成的。
考虑以下一个尝试使用starmap和共享字典的示例代码:
立即学习“Python免费学习笔记(深入)”;
import multiprocessing as mp
from multiprocessing.managers import SyncManager
n_cores = mp.cpu_count()
def parallel_fn(job_n, cache):
cache['job_b'] = job_n # 尝试更新共享字典
return job_n
if __name__=="__main__":
with SyncManager() as manager:
shared_cache = manager.dict()
# 错误:使用zip构造参数
args = list(zip(range(n_cores), shared_cache))
with mp.Pool(n_cores) as pool:
result = pool.starmap(parallel_fn, args)
print(f"Pool return: {result}")
print(f"Shared dict after: {shared_cache}")运行上述代码,会发现Pool return和Shared dict after都为空。这是因为starmap根本没有接收到任何任务。问题出在args = list(zip(range(n_cores), shared_cache))这一行。
zip()函数的作用是将多个可迭代对象中对应位置的元素打包成一个个元组,然后返回由这些元组组成的迭代器。它的一个关键特性是:zip函数会以最短的可迭代对象为准停止迭代。
在这个例子中:
当zip尝试将range(n_cores)和空的shared_cache打包时,由于shared_cache的长度为0,zip会立即停止,生成一个空的迭代器。因此,list(zip(range(n_cores), shared_cache))的结果将是一个空列表。
由于args是一个空列表,pool.starmap没有接收到任何任务来执行,所以result自然也是空的,shared_cache也保持为空,因为parallel_fn从未被调用。
要解决这个问题,我们需要确保传递给starmap的args列表包含正确数量和结构的参数。同时,对于共享字典的更新,也应确保使用有意义的键。
starmap期望接收一个可迭代对象,其中每个元素都是一个元组或列表,代表传递给目标函数的独立参数。为了让每个进程执行一个任务并访问共享字典,我们可以使用列表推导式来构造args:
# 修正:使用列表推导式构造参数 args = [(n, shared_cache) for n in range(n_cores)]
这样,args将是一个包含n_cores个元组的列表,每个元组形如(job_number, shared_cache_reference)。例如,如果n_cores是8,args将是[(0, shared_cache), (1, shared_cache), ..., (7, shared_cache)]。
在原始的parallel_fn中,cache['job_b'] = job_n会使所有进程都尝试使用相同的键'job_b'来更新字典。虽然SyncManager.dict会处理同步,但如果目标是存储每个任务的结果,通常会希望使用任务的唯一标识符作为键。将键改为job_n会更合理,使得每个任务将其自身的结果存储在以其任务编号为键的位置:
def parallel_fn(job_n, cache):
# 修正:使用job_n作为键
cache[job_n] = job_n
return job_n结合以上修正,完整的代码如下:
import multiprocessing as mp
from multiprocessing.managers import SyncManager
n_cores = mp.cpu_count()
def parallel_fn(job_n, cache):
"""
并行函数:将任务编号作为键和值存入共享字典。
"""
cache[job_n] = job_n
return job_n
if __name__=="__main__":
with SyncManager() as manager:
shared_cache = manager.dict()
# 正确构造starmap的参数列表
# 每个元组包含一个任务编号和一个共享字典的引用
args = [(n, shared_cache) for n in range(n_cores)]
print(f"准备分发任务,args: {args}")
with mp.Pool(n_cores) as pool:
# 使用starmap分发任务
result = pool.starmap(parallel_fn, args)
print(f"Pool返回结果: {result}")
print(f"共享字典最终内容: {shared_cache}")在我的8核机器上运行上述修正后的代码,输出将是:
准备分发任务,args: [(0, <DictProxy object at 0x...>), (1, <DictProxy object at 0x...>), ..., (7, <DictProxy object at 0x...>)]
Pool返回结果: [0, 1, 2, 3, 4, 5, 6, 7]
共享字典最终内容: {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7}可以看到,starmap成功返回了所有任务的结果,并且shared_cache也正确地包含了每个任务更新的数据。
通过遵循这些原则,可以更有效地利用Python的multiprocessing模块,构建健壮且高效的并行应用程序。
以上就是正确解决Python多进程starmap与共享字典的常见陷阱的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号