提升Python数据处理性能:从多线程到多进程的优化实践

DDD
发布: 2025-10-02 13:32:01
原创
366人浏览过

提升Python数据处理性能:从多线程到多进程的优化实践

本文探讨了在Python中处理大规模数据列表匹配和筛选时的性能瓶颈。针对传统多线程在CPU密集型任务中受限于GIL的局限性,文章提出并详细阐述了如何利用Python的multiprocessing模块,通过创建独立的进程来并行化任务,从而显著提升数据处理效率。文章提供了完整的代码示例和专业解析,帮助读者理解并应用多进程技术优化Python程序的性能。

1. Python并发编程的挑战:GIL与CPU密集型任务

python中,处理大量数据(例如超过23,000条json记录与3,000多个标记进行匹配)往往会面临性能挑战。当需要对这些数据进行复杂计算或字符串相似度比较等cpu密集型操作时,程序的执行时间可能会非常长。

一个常见的优化思路是使用并发编程,例如Python的threading模块。然而,对于CPU密集型任务,Python的全局解释器锁(Global Interpreter Lock, GIL)是一个显著的限制。GIL确保在任何给定时刻,只有一个线程能够执行Python字节码。这意味着即使创建了多个线程,它们也无法真正地并行执行CPU密集型任务,因为它们必须轮流获取GIL,导致多线程在CPU密集型场景下并不能带来显著的性能提升,甚至可能因为线程切换的开销而略微降低性能。

2. 问题场景与初始尝试

假设我们有两个列表:一个包含字典的json_list(例如用户数据,每个字典含有一个"code"字段),另一个是marking列表(包含需要匹配的字符串)。我们的目标是从json_list中找出与marking列表中的每个元素具有高相似度"code"的项,并将匹配到的标记和数据收集起来。

初始的尝试可能如下所示,使用threading模块来尝试并行化匹配过程:

import math
import threading
from difflib import SequenceMatcher

# 示例数据(实际数据量远大于此)
json_list = [
    {"code": "001123", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    {"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    {"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    {"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    # ... 更多数据
]
marking = ["654564", "hj876", "8768"] # ... 更多标记

def find_marking(x, y):
    """
    比较标记x与数据y的'code'字段的相似度。
    """
    text_match = SequenceMatcher(None, x, y.get('code')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

def eliminate_marking_threaded(marking_list, json_list):
    result, result_mark = [], []
    # 这里的内部函数及对data_scrap的修改存在并发问题和GIL限制
    # 实际场景中,对共享列表的pop/remove操作需要更复杂的同步机制
    # 且因为GIL,多线程在此处并不能带来性能提升
    def __process_eliminate(marking_item, data_scrap_copy):
        for data in data_scrap_copy: # 遍历副本
            result_data = find_marking(marking_item, data)
            if result_data:
                # 注意:这里的append操作如果直接对外部result/result_mark进行,
                # 需要加锁。且data_scrap_copy的remove只影响副本。
                # 实际代码中需要更严谨的共享数据处理。
                # 此处仅为说明多线程尝试的局限性。
                # result_mark.append(marking_item)
                # result.append(result_data)
                return

    threads = []
    # 针对每个marking创建线程,但由于GIL,实际不会并行执行
    for m in marking_list:
        # 传递json_list的副本以避免部分并发问题,但仍受GIL限制
        th = threading.Thread(target=__process_eliminate, args=(m, json_list[:]))
        th.start()
        threads.append(th)

    for thread in threads:
        thread.join()

    return result_mark, result # 在这个简单的多线程示例中,result/result_mark不会被正确填充

# 运行此代码会发现性能提升不明显,甚至可能更慢
# eliminated_markings, eliminated_data = eliminate_marking_threaded(marking, json_list)
登录后复制

如上所述,尽管使用了threading,但由于GIL的存在,这种方法在CPU密集型任务中无法实现真正的并行计算,耗时依然较长。

喵记多
喵记多

喵记多 - 自带助理的 AI 笔记

喵记多 27
查看详情 喵记多

立即学习Python免费学习笔记(深入)”;

3. 利用multiprocessing实现真正的并行计算

为了克服GIL的限制,Python提供了multiprocessing模块。它允许程序创建独立的进程,每个进程都有自己的Python解释器和内存空间,因此它们可以真正地并行执行CPU密集型任务,不受GIL的影响。

3.1 核心思路

  1. 进程而非线程:使用Process代替Thread。
  2. 数据共享:由于每个进程有独立的内存空间,共享数据需要特殊机制,例如multiprocessing.Manager来创建可在进程间共享的数据结构(如列表、字典)。
  3. 任务分发:将大型任务(如marking_list)分割成更小的块(chunk),然后将这些块分发给不同的进程进行处理。

3.2 multiprocessing实现示例

import math
from difflib import SequenceMatcher
from multiprocessing import Process, Manager
import time # 用于计时演示

# 模拟大规模数据
# 注意:实际运行时请替换为您的真实数据
json_list_large = []
for i in range(25000):
    json_list_large.append({"code": f"{i:06d}", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""})

marking_large = []
for i in range(3500):
    marking_large.append(f"{i:06d}")
marking_large.extend(["654564", "hj876", "8768"])

def find_marking(x, y):
    """
    比较标记x与数据y的'code'字段的相似度。
    """
    text_match = SequenceMatcher(None, x, y.get('code')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

def eliminate_marking_multiprocess(marking_list, json_list):
    """
    使用多进程并行处理标记列表,从json_list中查找匹配项。
    """
    manager = Manager()
    result_mark = manager.list() # 共享列表,用于存储匹配的标记
    result = manager.list()      # 共享列表,用于存储匹配的数据

    def __process_eliminate_chunk(sub_marking_list, data_scrap_copy, shared_result_mark, shared_result):
        """
        每个进程执行的函数,处理一部分标记列表。
        data_scrap_copy 是 json_list 的一个副本,进程对其的修改不会影响原始 json_list。
        """
        for marking_item in sub_marking_list:
            for data in data_scrap_copy: # 遍历json_list的副本
                result_data = find_marking(marking_item, data)
                if result_data:
                    # 将结果添加到共享列表中
                    shared_result_mark.append(marking_item)
                    shared_result.append(result_data)
                    # 注意:这里从data_scrap_copy中移除元素,只影响当前进程的副本,
                    # 且为了避免重复匹配,一旦找到一个匹配就跳出内层循环。
                    # 如果需要从原始json_list中“消除”,则需要更复杂的同步机制或在主进程中处理。
                    # data_scrap_copy.remove(data) # 如果需要确保每个标记只匹配一次,且从副本中移除
                    break # 找到匹配后,当前marking_item处理完毕,检查下一个marking_item

    processes = []
    # 根据CPU核心数或经验值设置chunk_size和num_processes
    # chunk_size决定了每个进程处理多少个marking
    chunk_size = max(1, len(marking_list) // (2 * (len(marking_list) // 1000 + 1))) # 动态调整chunk_size
    num_processes = math.ceil(len(marking_list) / chunk_size)

    print(f"Total markings: {len(marking_list)}, Chunk size: {chunk_size}, Number of processes: {num_processes}")

    for i in range(num_processes):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(marking_list))
        sub_marking_list = marking_list[start_idx:end_idx]

        if not sub_marking_list:
            continue

        p = Process(
            target=__process_eliminate_chunk,
            # 传递json_list的副本给每个进程,避免进程间直接修改原始大列表的复杂同步问题
            args=(sub_marking_list, json_list[:], result_mark, result)
        )
        processes.append(p)
        p.start() # 启动进程

    for p in processes:
        p.join() # 等待所有进程完成

    manager.shutdown() # 关闭Manager,释放资源
    return list(result_mark), list(result) # 将Manager.list转换为普通Python列表

# 运行多进程版本
print("Starting multiprocessing elimination...")
start_time = time.time()
eliminated_markings, eliminated_data = eliminate_marking_multiprocess(marking_large, json_list_large)
end_time = time.time()
print(f"Multiprocessing finished in {end_time - start_time:.2f} seconds.")

print(f"Found {len(eliminated_markings)} matches.")
# print("Eliminated Markings:", eliminated_markings[:5]) # 打印前5个示例
# print("Eliminated Data:", eliminated_data[:5]) # 打印前5个示例
登录后复制

3.3 代码解析与注意事项

  1. multiprocessing.Manager:
    • Manager() 创建一个管理器对象,它允许你创建可在不同进程间共享的Python对象。
    • manager.list() 创建一个可以在多个进程中安全访问和修改的列表。这解决了传统列表在多进程环境下修改时可能出现的竞争条件和数据不一致问题。result_mark 和 result 就是通过这种方式创建的共享列表。
  2. 任务分块 (chunk_size):
    • marking_list 被分割成若干个子列表(sub_marking_list)。每个进程负责处理一个子列表。
    • 合理设置 chunk_size 很重要。过小的块可能导致进程创建和管理的开销过大;过大的块可能导致某些进程负载不均。可以根据实际CPU核心数和任务特性进行调整。
  3. 进程创建与执行:
    • Process(target=__process_eliminate_chunk, args=(...)) 创建一个新进程,并指定其执行的函数和传递的参数。
    • p.start() 启动进程。
    • p.join() 等待子进程完成。主进程会阻塞,直到所有子进程都执行完毕。
  4. json_list[:] 的作用:
    • 在 args=(sub_marking_list, json_list[:], ...) 中,json_list[:] 创建了 json_list 的一个浅拷贝。这意味着每个子进程都会收到 json_list 的一个独立副本。
    • 重要提示:如果子进程内部对 data_scrap_copy(即 json_list 的副本)进行 remove 操作,这只会影响该进程自身的副本,而不会修改原始的 json_list。如果目标是实际从原始 json_list 中移除匹配项,则需要更复杂的策略,例如让每个进程返回其匹配到的项的索引,然后在主进程中统一处理移除,或者使用 Manager().list() 来包装 json_list 并进行同步操作,但这会引入更多的复杂性和潜在的性能瓶颈。在当前示例中,我们主要关注的是收集匹配的标记和数据,而不是原地修改原始 json_list。
  5. manager.shutdown():
    • 在所有进程完成工作后,调用 manager.shutdown() 来关闭管理器并释放其资源。
  6. 结果转换:
    • Manager().list() 返回的对象是特殊的代理对象。为了在主进程中像普通列表一样操作它们,通常需要将其转换为标准的Python列表,例如 list(result_mark)。

4. 总结

当Python程序遇到CPU密集型任务,且多线程无法带来性能提升时,multiprocessing模块是更优的选择。通过创建独立的进程,multiprocessing能够绕过GIL的限制,实现真正的并行计算,从而显著缩短程序的执行时间。在使用multiprocessing时,需要注意进程间数据共享的机制(如Manager)以及任务分发策略,以确保程序的正确性和高效性。合理地应用多进程技术,可以有效提升Python在处理大规模数据和计算密集型任务时的性能表现。

以上就是提升Python数据处理性能:从多线程到多进程的优化实践的详细内容,更多请关注php中文网其它相关文章!

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号