使用Python多进程优化大数据量匹配与筛选性能

心靈之曲
发布: 2025-10-02 15:09:01
原创
390人浏览过

使用Python多进程优化大数据量匹配与筛选性能

本文旨在解决Python处理大数据量列表匹配与筛选时遇到的性能瓶颈,特别是当传统多线程方案效果不佳时。我们将深入探讨如何利用Python的multiprocessing模块,结合Manager实现进程间数据共享,以及合理的任务分块策略,显著提升CPU密集型任务的执行效率,从而将耗时数十分钟的操作缩短至可接受的范围。

1. 问题背景与挑战

在处理大规模数据集时,例如需要在一个包含数万条记录的json列表中(json_list)查找并匹配另一个包含数千个标记(marking)的列表中的元素,性能往往成为一个关键挑战。具体场景是,json_list中的每个字典包含一个code字段,我们需要将marking列表中的每个字符串与json_list中元素的code字段进行相似度匹配。匹配规则是使用difflib.sequencematcher计算相似度,当相似度为1(完全匹配)或介于0.98到0.99之间时,认为匹配成功。

原始的单线程或简单的多线程(threading)实现,在数据量庞大时(json_list超过23,000条,marking超过3,000条),可能需要20分钟甚至更长时间才能完成。这主要是因为Python的全局解释器锁(GIL)限制了多线程在CPU密集型任务上的并行执行能力。即使创建了多个线程,它们也无法同时在多个CPU核心上运行Python字节码,导致性能提升不明显。

2. 多进程(Multiprocessing)的解决方案

为了克服GIL的限制,Python提供了multiprocessing模块,它允许创建独立的进程,每个进程都有自己的Python解释器和内存空间。这意味着不同的进程可以在不同的CPU核心上真正并行执行CPU密集型任务,从而显著提高性能。

本教程将展示如何利用multiprocessing库来优化上述数据匹配和筛选过程。

2.1 核心组件介绍

  • multiprocessing.Process: 用于创建和管理新的进程。
  • multiprocessing.Manager: 用于创建可以在不同进程之间共享的数据结构(如列表、字典等)。这是解决进程间通信和数据共享的关键,因为普通Python对象在进程间默认不共享。
  • difflib.SequenceMatcher: 用于计算两个序列(字符串)的相似度。

2.2 匹配逻辑函数 find_marking

这个函数负责执行单个标记与JSON数据项的匹配逻辑。它保持不变,因为它是一个纯计算函数,不涉及并发问题。

立即学习Python免费学习笔记(深入)”;

Gnomic智能体平台
Gnomic智能体平台

国内首家无需魔法免费无限制使用的ChatGPT4.0,网站内设置了大量智能体供大家免费使用,还有五款语言大模型供大家免费使用~

Gnomic智能体平台 47
查看详情 Gnomic智能体平台
from difflib import SequenceMatcher

def find_marking(x: str, y: dict) -> dict | None:
    """
    比较标记字符串x与JSON数据项y中的'code'字段的相似度。
    如果相似度为1或在0.98到0.99之间,则返回y,否则返回None。
    """
    text_match = SequenceMatcher(None, x, y.get('code', '')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None
登录后复制

注意: 确保y字典中包含'code'键,否则y.get('code', '')可以提供一个默认值,避免KeyError。

2.3 多进程筛选主函数 eliminate_marking

这个函数是整个解决方案的核心,它协调多个进程来并行处理匹配任务。

import math
from multiprocessing import Process, Manager

def eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]:
    """
    使用多进程并行地从json_list中匹配和筛选marking_list中的标记。

    Args:
        marking_list: 待匹配的标记字符串列表。
        json_list: 包含'code'字段的JSON字典列表。

    Returns:
        一个元组,包含两个列表:
        - result_mark: 成功匹配的标记列表。
        - result: 成功匹配的JSON数据项列表。
    """
    # 1. 初始化Manager和共享数据结构
    # Manager用于创建可在进程间共享的列表,以收集结果。
    manager = Manager()
    result_mark = manager.list()  # 共享列表,用于存储成功匹配的标记
    result = manager.list()       # 共享列表,用于存储成功匹配的JSON数据项

    def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict],
                            shared_result_mark: Manager.list, shared_result: Manager.list):
        """
        每个进程执行的任务函数。
        它遍历分配给它的标记子列表,并尝试在data_scrap中找到匹配项。
        """
        # data_scrap是json_list的一个副本,每个进程独立操作。
        # 注意:这里的data_scrap是json_list的浅拷贝,对其内部字典的修改会影响原始字典
        # 但对其列表结构(如remove操作)的修改仅影响当前进程的副本。
        # 鉴于我们的目标是收集匹配项,这种拷贝方式是安全的。

        for marking_item in sub_marking_list: # 遍历当前进程负责的标记子列表
            for data in data_scrap:           # 遍历json_list的副本
                result_data = find_marking(marking_item, data)
                if result_data:
                    # 找到匹配项后,将其添加到共享列表中
                    shared_result_mark.append(marking_item)
                    shared_result.append(result_data)
                    # 这里的remove操作只影响当前进程的data_scrap副本,
                    # 并不影响其他进程的副本或原始json_list。
                    # 如果目标是真正从原始json_list中移除,需要更复杂的同步机制。
                    # 在当前场景下,我们主要关注收集匹配结果。
                    # data_scrap.remove(data) 
                    # 如果一个标记只需要匹配一次,可以在找到后跳出内层循环
                    break # 一个marking_item找到一个匹配后就跳出,避免重复匹配

    # 2. 任务分块与进程创建
    processes = []
    chunk_size = 100  # 每个进程处理的marking_list块的大小

    # 计算需要创建的进程数量
    # 这里将marking_list分成块,每个进程处理一个或多个块。
    # 另一种常见策略是基于CPU核心数创建进程。
    num_chunks = math.ceil(len(marking_list) / chunk_size)

    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(marking_list))
        sub_marking_list = marking_list[start_idx:end_idx]

        if not sub_marking_list:
            continue # 避免创建空任务的进程

        p = Process(
            target=__process_eliminate,
            # args参数传递给目标函数。
            # json_list[:] 创建了一个json_list的浅拷贝,确保每个进程有独立的副本。
            args=(sub_marking_list, json_list[:], result_mark, result)
        )
        processes.append(p)
        p.start() # 启动进程

    # 3. 等待所有进程完成
    for p in processes:
        p.join() # 阻塞主进程,直到当前进程执行完毕

    # 4. 关闭Manager并返回结果
    manager.shutdown() # 在所有进程完成后关闭Manager
    return list(result_mark), list(result) # 将Manager.list转换为普通list返回
登录后复制

2.4 完整示例代码

为了方便测试,我们创建一些模拟数据:

import math
import time
import random
import string
from difflib import SequenceMatcher
from multiprocessing import Process, Manager

# 模拟数据
def generate_fake_data(num_json, num_marking):
    json_list = []
    for i in range(num_json):
        code_val = ''.join(random.choices(string.digits, k=6))
        json_list.append({
            "code": code_val,
            "phone_number": f"1{random.randint(1000000000, 9999999999)}",
            "email": f"user{i}@example.com",
            "address": f"address_fake_{i}",
            "note": f"note dummy {i}"
        })

    marking = []
    # 确保有一些匹配项
    for i in range(num_marking // 2):
        # 从json_list中随机取一个code作为marking
        marking.append(random.choice(json_list)['code'])
    # 添加一些不匹配的marking
    for i in range(num_marking // 2, num_marking):
        marking.append(''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5, 8))))

    random.shuffle(marking) # 打乱顺序
    return json_list, marking

# 假设的 find_marking 函数
def find_marking(x: str, y: dict) -> dict | None:
    text_match = SequenceMatcher(None, x, y.get('code', '')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

# 假设的 eliminate_marking 函数(与上面定义的一致)
def eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]:
    manager = Manager()
    result_mark = manager.list()
    result = manager.list()

    def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict],
                            shared_result_mark: Manager.list, shared_result: Manager.list):
        for marking_item in sub_marking_list:
            for data in data_scrap:
                result_data = find_marking(marking_item, data)
                if result_data:
                    shared_result_mark.append(marking_item)
                    shared_result.append(result_data)
                    break # 一个marking_item找到一个匹配后就跳出

    processes = []
    # 这里的chunk_size可以根据实际CPU核心数和任务复杂度进行调整
    # 较小的chunk_size可能导致更多的进程创建和管理开销
    # 较大的chunk_size可能导致部分核心利用率不足
    chunk_size = 50 # 调整为50,以创建更多进程进行测试,更细粒度的任务分配

    # 优化:根据CPU核心数来决定进程数量,而不是简单地按chunk_size分块
    # 理想情况下,进程数不应超过CPU核心数
    # num_processes = os.cpu_count() or 1
    # marking_per_process = math.ceil(len(marking_list) / num_processes)
    # 
    # for i in range(num_processes):
    #     start_idx = i * marking_per_process
    #     end_idx = min((i + 1) * marking_per_process, len(marking_list))
    #     sub_marking_list = marking_list[start_idx:end_idx]
    #     ...

    # 当前实现是按chunk_size分块
    num_chunks = math.ceil(len(marking_list) / chunk_size)

    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(marking_list))
        sub_marking_list = marking_list[start_idx:end_idx]

        if not sub_marking_list:
            continue

        p = Process(
            target=__process_eliminate,
            args=(sub_marking_list, json_list[:], result_mark, result)
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
    manager.shutdown()
    return list(result_mark), list(result)

if __name__ == "__main__":
    # 生成模拟数据
    NUM_JSON = 23000
    NUM_MARKING = 3000
    print(f"生成 {NUM_JSON} 条JSON数据和 {NUM_MARKING} 条标记数据...")
    test_json_list, test_marking_list = generate_fake_data(NUM_JSON, NUM_MARKING)
    print("数据生成完毕。")

    start_time = time.time()
    eliminated_markings, eliminated_data = eliminate_marking(test_marking_list, test_json_list)
    end_time = time.time()

    print(f"\n多进程处理完成。")
    print(f"总耗时: {end_time - start_time:.2f} 秒")
    print(f"找到 {len(eliminated_markings)} 个匹配标记。")
    print(f"找到 {len(eliminated_data)} 条匹配数据。")

    # 验证部分结果
    if eliminated_markings:
        print(f"部分匹配标记示例: {eliminated_markings[:5]}")
    if eliminated_data:
        print(f"部分匹配数据示例: {eliminated_data[:2]}")
登录后复制

3. 注意事项与最佳实践

  1. GIL与多进程: 理解Python GIL是关键。对于CPU密集型任务,multiprocessing是比threading更好的选择,因为它绕过了GIL的限制,实现了真正的并行计算。
  2. 数据共享: 进程间数据共享比线程间复杂。普通Python对象在进程间默认不共享,需要使用multiprocessing.Manager创建共享数据结构,或者通过管道/队列进行通信。本例中Manager.list用于收集结果,避免了复杂的同步机制
  3. 数据拷贝: 在eliminate_marking函数中,我们通过json_list[:]将json_list的浅拷贝传递给每个进程。这意味着每个进程都在自己的json_list副本上进行查找。这样做的好处是避免了对同一共享json_list进行并发读写和删除操作的复杂同步问题。缺点是,如果目标是修改原始的json_list(例如,从中删除匹配项),这种方法不会直接实现。但对于“获取数据”的需求,收集匹配结果是更安全和常见的模式。
  4. 任务分块: 合理地划分任务(marking_list的chunk_size)对性能至关重要。过小的块可能导致过多的进程创建和管理开销;过大的块可能导致部分进程负载不均。通常,进程数量不应超过CPU的核心数。可以根据实际情况调整chunk_size或直接根据os.cpu_count()来分配任务。
  5. 进程开销: 进程的创建和销毁比线程更耗资源。对于非常短小的任务,多进程的开销可能抵消并行带来的收益。但在本例这种大数据量、CPU密集型任务中,多进程的优势非常明显。
  6. 错误处理: 在生产环境中,应考虑在进程中加入错误处理机制,例如使用try-except块,并记录异常,以提高程序的健壮性。
  7. Manager的生命周期: 确保在所有子进程完成后调用manager.shutdown()来清理Manager创建的资源。

4. 总结

通过将任务分解为独立的子任务并在多个进程中并行执行,结合multiprocessing.Manager实现结果的有效收集,我们成功地将大数据量列表匹配和筛选的性能提升了一个数量级。这种方法特别适用于那些受限于Python GIL的CPU密集型计算任务。理解multiprocessing的工作原理和最佳实践,能够帮助开发者在处理大规模数据时构建出更高效、更健壮的Python应用程序。

以上就是使用Python多进程优化大数据量匹配与筛选性能的详细内容,更多请关注php中文网其它相关文章!

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号