
在处理大规模数据集时,例如需要在一个包含数万条记录的json列表中(json_list)查找并匹配另一个包含数千个标记(marking)的列表中的元素,性能往往成为一个关键挑战。具体场景是,json_list中的每个字典包含一个code字段,我们需要将marking列表中的每个字符串与json_list中元素的code字段进行相似度匹配。匹配规则是使用difflib.sequencematcher计算相似度,当相似度为1(完全匹配)或介于0.98到0.99之间时,认为匹配成功。
原始的单线程或简单的多线程(threading)实现,在数据量庞大时(json_list超过23,000条,marking超过3,000条),可能需要20分钟甚至更长时间才能完成。这主要是因为Python的全局解释器锁(GIL)限制了多线程在CPU密集型任务上的并行执行能力。即使创建了多个线程,它们也无法同时在多个CPU核心上运行Python字节码,导致性能提升不明显。
为了克服GIL的限制,Python提供了multiprocessing模块,它允许创建独立的进程,每个进程都有自己的Python解释器和内存空间。这意味着不同的进程可以在不同的CPU核心上真正并行执行CPU密集型任务,从而显著提高性能。
本教程将展示如何利用multiprocessing库来优化上述数据匹配和筛选过程。
这个函数负责执行单个标记与JSON数据项的匹配逻辑。它保持不变,因为它是一个纯计算函数,不涉及并发问题。
立即学习“Python免费学习笔记(深入)”;
from difflib import SequenceMatcher
def find_marking(x: str, y: dict) -> dict | None:
"""
比较标记字符串x与JSON数据项y中的'code'字段的相似度。
如果相似度为1或在0.98到0.99之间,则返回y,否则返回None。
"""
text_match = SequenceMatcher(None, x, y.get('code', '')).ratio()
if text_match == 1 or (0.98 <= text_match < 0.99):
return y
return None注意: 确保y字典中包含'code'键,否则y.get('code', '')可以提供一个默认值,避免KeyError。
这个函数是整个解决方案的核心,它协调多个进程来并行处理匹配任务。
import math
from multiprocessing import Process, Manager
def eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]:
"""
使用多进程并行地从json_list中匹配和筛选marking_list中的标记。
Args:
marking_list: 待匹配的标记字符串列表。
json_list: 包含'code'字段的JSON字典列表。
Returns:
一个元组,包含两个列表:
- result_mark: 成功匹配的标记列表。
- result: 成功匹配的JSON数据项列表。
"""
# 1. 初始化Manager和共享数据结构
# Manager用于创建可在进程间共享的列表,以收集结果。
manager = Manager()
result_mark = manager.list() # 共享列表,用于存储成功匹配的标记
result = manager.list() # 共享列表,用于存储成功匹配的JSON数据项
def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict],
shared_result_mark: Manager.list, shared_result: Manager.list):
"""
每个进程执行的任务函数。
它遍历分配给它的标记子列表,并尝试在data_scrap中找到匹配项。
"""
# data_scrap是json_list的一个副本,每个进程独立操作。
# 注意:这里的data_scrap是json_list的浅拷贝,对其内部字典的修改会影响原始字典
# 但对其列表结构(如remove操作)的修改仅影响当前进程的副本。
# 鉴于我们的目标是收集匹配项,这种拷贝方式是安全的。
for marking_item in sub_marking_list: # 遍历当前进程负责的标记子列表
for data in data_scrap: # 遍历json_list的副本
result_data = find_marking(marking_item, data)
if result_data:
# 找到匹配项后,将其添加到共享列表中
shared_result_mark.append(marking_item)
shared_result.append(result_data)
# 这里的remove操作只影响当前进程的data_scrap副本,
# 并不影响其他进程的副本或原始json_list。
# 如果目标是真正从原始json_list中移除,需要更复杂的同步机制。
# 在当前场景下,我们主要关注收集匹配结果。
# data_scrap.remove(data)
# 如果一个标记只需要匹配一次,可以在找到后跳出内层循环
break # 一个marking_item找到一个匹配后就跳出,避免重复匹配
# 2. 任务分块与进程创建
processes = []
chunk_size = 100 # 每个进程处理的marking_list块的大小
# 计算需要创建的进程数量
# 这里将marking_list分成块,每个进程处理一个或多个块。
# 另一种常见策略是基于CPU核心数创建进程。
num_chunks = math.ceil(len(marking_list) / chunk_size)
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(marking_list))
sub_marking_list = marking_list[start_idx:end_idx]
if not sub_marking_list:
continue # 避免创建空任务的进程
p = Process(
target=__process_eliminate,
# args参数传递给目标函数。
# json_list[:] 创建了一个json_list的浅拷贝,确保每个进程有独立的副本。
args=(sub_marking_list, json_list[:], result_mark, result)
)
processes.append(p)
p.start() # 启动进程
# 3. 等待所有进程完成
for p in processes:
p.join() # 阻塞主进程,直到当前进程执行完毕
# 4. 关闭Manager并返回结果
manager.shutdown() # 在所有进程完成后关闭Manager
return list(result_mark), list(result) # 将Manager.list转换为普通list返回
为了方便测试,我们创建一些模拟数据:
import math
import time
import random
import string
from difflib import SequenceMatcher
from multiprocessing import Process, Manager
# 模拟数据
def generate_fake_data(num_json, num_marking):
json_list = []
for i in range(num_json):
code_val = ''.join(random.choices(string.digits, k=6))
json_list.append({
"code": code_val,
"phone_number": f"1{random.randint(1000000000, 9999999999)}",
"email": f"user{i}@example.com",
"address": f"address_fake_{i}",
"note": f"note dummy {i}"
})
marking = []
# 确保有一些匹配项
for i in range(num_marking // 2):
# 从json_list中随机取一个code作为marking
marking.append(random.choice(json_list)['code'])
# 添加一些不匹配的marking
for i in range(num_marking // 2, num_marking):
marking.append(''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5, 8))))
random.shuffle(marking) # 打乱顺序
return json_list, marking
# 假设的 find_marking 函数
def find_marking(x: str, y: dict) -> dict | None:
text_match = SequenceMatcher(None, x, y.get('code', '')).ratio()
if text_match == 1 or (0.98 <= text_match < 0.99):
return y
return None
# 假设的 eliminate_marking 函数(与上面定义的一致)
def eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]:
manager = Manager()
result_mark = manager.list()
result = manager.list()
def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict],
shared_result_mark: Manager.list, shared_result: Manager.list):
for marking_item in sub_marking_list:
for data in data_scrap:
result_data = find_marking(marking_item, data)
if result_data:
shared_result_mark.append(marking_item)
shared_result.append(result_data)
break # 一个marking_item找到一个匹配后就跳出
processes = []
# 这里的chunk_size可以根据实际CPU核心数和任务复杂度进行调整
# 较小的chunk_size可能导致更多的进程创建和管理开销
# 较大的chunk_size可能导致部分核心利用率不足
chunk_size = 50 # 调整为50,以创建更多进程进行测试,更细粒度的任务分配
# 优化:根据CPU核心数来决定进程数量,而不是简单地按chunk_size分块
# 理想情况下,进程数不应超过CPU核心数
# num_processes = os.cpu_count() or 1
# marking_per_process = math.ceil(len(marking_list) / num_processes)
#
# for i in range(num_processes):
# start_idx = i * marking_per_process
# end_idx = min((i + 1) * marking_per_process, len(marking_list))
# sub_marking_list = marking_list[start_idx:end_idx]
# ...
# 当前实现是按chunk_size分块
num_chunks = math.ceil(len(marking_list) / chunk_size)
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(marking_list))
sub_marking_list = marking_list[start_idx:end_idx]
if not sub_marking_list:
continue
p = Process(
target=__process_eliminate,
args=(sub_marking_list, json_list[:], result_mark, result)
)
processes.append(p)
p.start()
for p in processes:
p.join()
manager.shutdown()
return list(result_mark), list(result)
if __name__ == "__main__":
# 生成模拟数据
NUM_JSON = 23000
NUM_MARKING = 3000
print(f"生成 {NUM_JSON} 条JSON数据和 {NUM_MARKING} 条标记数据...")
test_json_list, test_marking_list = generate_fake_data(NUM_JSON, NUM_MARKING)
print("数据生成完毕。")
start_time = time.time()
eliminated_markings, eliminated_data = eliminate_marking(test_marking_list, test_json_list)
end_time = time.time()
print(f"\n多进程处理完成。")
print(f"总耗时: {end_time - start_time:.2f} 秒")
print(f"找到 {len(eliminated_markings)} 个匹配标记。")
print(f"找到 {len(eliminated_data)} 条匹配数据。")
# 验证部分结果
if eliminated_markings:
print(f"部分匹配标记示例: {eliminated_markings[:5]}")
if eliminated_data:
print(f"部分匹配数据示例: {eliminated_data[:2]}")
通过将任务分解为独立的子任务并在多个进程中并行执行,结合multiprocessing.Manager实现结果的有效收集,我们成功地将大数据量列表匹配和筛选的性能提升了一个数量级。这种方法特别适用于那些受限于Python GIL的CPU密集型计算任务。理解multiprocessing的工作原理和最佳实践,能够帮助开发者在处理大规模数据时构建出更高效、更健壮的Python应用程序。
以上就是使用Python多进程优化大数据量匹配与筛选性能的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号