
在python中,处理大量数据(例如超过23,000条json记录与3,000多个标记进行匹配)往往会面临性能挑战。当需要对这些数据进行复杂计算或字符串相似度比较等cpu密集型操作时,程序的执行时间可能会非常长。
一个常见的优化思路是使用并发编程,例如Python的threading模块。然而,对于CPU密集型任务,Python的全局解释器锁(Global Interpreter Lock, GIL)是一个显著的限制。GIL确保在任何给定时刻,只有一个线程能够执行Python字节码。这意味着即使创建了多个线程,它们也无法真正地并行执行CPU密集型任务,因为它们必须轮流获取GIL,导致多线程在CPU密集型场景下并不能带来显著的性能提升,甚至可能因为线程切换的开销而略微降低性能。
假设我们有两个列表:一个包含字典的json_list(例如用户数据,每个字典含有一个"code"字段),另一个是marking列表(包含需要匹配的字符串)。我们的目标是从json_list中找出与marking列表中的每个元素具有高相似度"code"的项,并将匹配到的标记和数据收集起来。
初始的尝试可能如下所示,使用threading模块来尝试并行化匹配过程:
import math
import threading
from difflib import SequenceMatcher
# 示例数据(实际数据量远大于此)
json_list = [
{"code": "001123", "phone_number": "...", "email": "...", "address": "...", "note": ""},
{"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""},
{"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""},
{"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""},
# ... 更多数据
]
marking = ["654564", "hj876", "8768"] # ... 更多标记
def find_marking(x, y):
"""
比较标记x与数据y的'code'字段的相似度。
"""
text_match = SequenceMatcher(None, x, y.get('code')).ratio()
if text_match == 1 or (0.98 <= text_match < 0.99):
return y
return None
def eliminate_marking_threaded(marking_list, json_list):
result, result_mark = [], []
# 这里的内部函数及对data_scrap的修改存在并发问题和GIL限制
# 实际场景中,对共享列表的pop/remove操作需要更复杂的同步机制
# 且因为GIL,多线程在此处并不能带来性能提升
def __process_eliminate(marking_item, data_scrap_copy):
for data in data_scrap_copy: # 遍历副本
result_data = find_marking(marking_item, data)
if result_data:
# 注意:这里的append操作如果直接对外部result/result_mark进行,
# 需要加锁。且data_scrap_copy的remove只影响副本。
# 实际代码中需要更严谨的共享数据处理。
# 此处仅为说明多线程尝试的局限性。
# result_mark.append(marking_item)
# result.append(result_data)
return
threads = []
# 针对每个marking创建线程,但由于GIL,实际不会并行执行
for m in marking_list:
# 传递json_list的副本以避免部分并发问题,但仍受GIL限制
th = threading.Thread(target=__process_eliminate, args=(m, json_list[:]))
th.start()
threads.append(th)
for thread in threads:
thread.join()
return result_mark, result # 在这个简单的多线程示例中,result/result_mark不会被正确填充
# 运行此代码会发现性能提升不明显,甚至可能更慢
# eliminated_markings, eliminated_data = eliminate_marking_threaded(marking, json_list)如上所述,尽管使用了threading,但由于GIL的存在,这种方法在CPU密集型任务中无法实现真正的并行计算,耗时依然较长。
立即学习“Python免费学习笔记(深入)”;
为了克服GIL的限制,Python提供了multiprocessing模块。它允许程序创建独立的进程,每个进程都有自己的Python解释器和内存空间,因此它们可以真正地并行执行CPU密集型任务,不受GIL的影响。
import math
from difflib import SequenceMatcher
from multiprocessing import Process, Manager
import time # 用于计时演示
# 模拟大规模数据
# 注意:实际运行时请替换为您的真实数据
json_list_large = []
for i in range(25000):
json_list_large.append({"code": f"{i:06d}", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""})
marking_large = []
for i in range(3500):
marking_large.append(f"{i:06d}")
marking_large.extend(["654564", "hj876", "8768"])
def find_marking(x, y):
"""
比较标记x与数据y的'code'字段的相似度。
"""
text_match = SequenceMatcher(None, x, y.get('code')).ratio()
if text_match == 1 or (0.98 <= text_match < 0.99):
return y
return None
def eliminate_marking_multiprocess(marking_list, json_list):
"""
使用多进程并行处理标记列表,从json_list中查找匹配项。
"""
manager = Manager()
result_mark = manager.list() # 共享列表,用于存储匹配的标记
result = manager.list() # 共享列表,用于存储匹配的数据
def __process_eliminate_chunk(sub_marking_list, data_scrap_copy, shared_result_mark, shared_result):
"""
每个进程执行的函数,处理一部分标记列表。
data_scrap_copy 是 json_list 的一个副本,进程对其的修改不会影响原始 json_list。
"""
for marking_item in sub_marking_list:
for data in data_scrap_copy: # 遍历json_list的副本
result_data = find_marking(marking_item, data)
if result_data:
# 将结果添加到共享列表中
shared_result_mark.append(marking_item)
shared_result.append(result_data)
# 注意:这里从data_scrap_copy中移除元素,只影响当前进程的副本,
# 且为了避免重复匹配,一旦找到一个匹配就跳出内层循环。
# 如果需要从原始json_list中“消除”,则需要更复杂的同步机制或在主进程中处理。
# data_scrap_copy.remove(data) # 如果需要确保每个标记只匹配一次,且从副本中移除
break # 找到匹配后,当前marking_item处理完毕,检查下一个marking_item
processes = []
# 根据CPU核心数或经验值设置chunk_size和num_processes
# chunk_size决定了每个进程处理多少个marking
chunk_size = max(1, len(marking_list) // (2 * (len(marking_list) // 1000 + 1))) # 动态调整chunk_size
num_processes = math.ceil(len(marking_list) / chunk_size)
print(f"Total markings: {len(marking_list)}, Chunk size: {chunk_size}, Number of processes: {num_processes}")
for i in range(num_processes):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(marking_list))
sub_marking_list = marking_list[start_idx:end_idx]
if not sub_marking_list:
continue
p = Process(
target=__process_eliminate_chunk,
# 传递json_list的副本给每个进程,避免进程间直接修改原始大列表的复杂同步问题
args=(sub_marking_list, json_list[:], result_mark, result)
)
processes.append(p)
p.start() # 启动进程
for p in processes:
p.join() # 等待所有进程完成
manager.shutdown() # 关闭Manager,释放资源
return list(result_mark), list(result) # 将Manager.list转换为普通Python列表
# 运行多进程版本
print("Starting multiprocessing elimination...")
start_time = time.time()
eliminated_markings, eliminated_data = eliminate_marking_multiprocess(marking_large, json_list_large)
end_time = time.time()
print(f"Multiprocessing finished in {end_time - start_time:.2f} seconds.")
print(f"Found {len(eliminated_markings)} matches.")
# print("Eliminated Markings:", eliminated_markings[:5]) # 打印前5个示例
# print("Eliminated Data:", eliminated_data[:5]) # 打印前5个示例当Python程序遇到CPU密集型任务,且多线程无法带来性能提升时,multiprocessing模块是更优的选择。通过创建独立的进程,multiprocessing能够绕过GIL的限制,实现真正的并行计算,从而显著缩短程序的执行时间。在使用multiprocessing时,需要注意进程间数据共享的机制(如Manager)以及任务分发策略,以确保程序的正确性和高效性。合理地应用多进程技术,可以有效提升Python在处理大规模数据和计算密集型任务时的性能表现。
以上就是提升Python数据处理性能:从多线程到多进程的优化实践的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号