高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱

霞舞
发布: 2025-11-19 14:34:20
原创
234人浏览过

高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱

本文旨在解决在pandas中循环合并大量csv文件时遇到的性能瓶颈。通过分析循环中使用`pd.concat`的低效性,文章提出两种优化策略:一是将所有数据收集到字典中,最后进行一次性`pd.concat`;二是利用`concurrent.futures.threadpoolexecutor`实现文件读取的并行化。这些方法显著提升了处理效率,避免了随着文件数量增加而导致的性能急剧下降。

1. 问题背景与循环concat的性能瓶颈

在数据处理过程中,我们经常需要合并来自多个文件的信息。当面对大量(例如上千个)中等大小(例如每个15MB,10,000行)的CSV文件时,一种直观的做法是使用循环迭代读取每个文件,进行必要的转换,然后通过pd.concat将其追加到一个主DataFrame中。然而,这种方法存在严重的性能问题。

考虑以下原始代码示例:

import pandas as pd
import os

# 假设 df 是一个包含文件路径信息的DataFrame
# root_path 是根目录
# df 示例:
#    File ID    File Name
# 0  folderA  file001.txt
# 1  folderB  file002.txt

merged_data = pd.DataFrame()
count = 0
for index, row in df.iterrows():
    folder_name = row['File ID'].strip()
    file_name = row['File Name'].strip()
    file_path = os.path.join(root_path, folder_name, file_name)

    # 读取、转置并插入新列
    file_data = pd.read_csv(file_path, names=['Case', f'{folder_name}_{file_name}'], sep='\t')
    file_data_transposed = file_data.set_index('Case').T.reset_index(drop=True)
    file_data_transposed.insert(loc=0, column='folder_file_id', value=str(folder_name+'_'+file_name))

    # 在循环中进行拼接
    merged_data = pd.concat([merged_data, file_data_transposed], axis=0, ignore_index=True)
    count = count + 1
    print(count)
登录后复制

上述代码的性能问题在于,每次循环调用pd.concat时,Pandas都需要创建一个新的DataFrame来容纳旧数据和新数据。这意味着大量的内存重新分配和数据复制操作,随着merged_data的增大,这些操作的开销会呈指数级增长,导致处理速度越来越慢。对于上千个文件,这种方法是不可持续的。

2. 优化策略一:收集数据并进行单次pd.concat

解决循环中pd.concat低效问题的核心思想是:避免在每次迭代中都进行拼接操作。取而代之的是,将每个文件处理后的数据收集到一个数据结构(如列表或字典)中,然后在循环结束后,一次性地调用pd.concat来合并所有数据。

以下是优化后的代码示例:

import pathlib
import pandas as pd

# 假设 df 是一个包含文件路径信息的DataFrame
# root_path 是根目录,使用 pathlib 替换 os.path
root_path = pathlib.Path('root') 

data_parts = {} # 使用字典收集每个文件的处理结果

# 使用 enumerate 简化计数器,并迭代 df
for count, (_, row) in enumerate(df.iterrows(), 1):
    folder_name = row['File ID'].strip()
    file_name = row['File Name'].strip()
    file_path = root_path / folder_name / file_name # pathlib 的路径拼接
    folder_file_id = f'{folder_name}_{file_name}'

    # 读取CSV文件,并进行初步处理
    # header=None 因为文件没有表头
    # memory_map=True 可以提高大文件读取效率,low_memory=False 避免混合类型警告
    file_data = pd.read_csv(file_path, header=None, sep='\t',
                            names=['Case', folder_file_id],
                            memory_map=True, low_memory=False)

    # 将 'Case' 列设置为索引,并使用 squeeze() 将 DataFrame 转换为 Series
    # 这样可以方便后续的 unstack 操作
    data_parts[folder_file_id] = file_data.set_index('Case').squeeze()
    print(count)

# 循环结束后,一次性合并所有数据
# pd.concat 传入字典,字典的键将作为新的层级索引 (names=['folder_file_id'])
# unstack('Case') 将 'Case' 索引转换为列
# reset_index() 将多级索引扁平化,并重置索引
merged_data = (pd.concat(data_parts, names=['folder_file_id'])
                 .unstack('Case').reset_index())
登录后复制

代码改进点说明:

  1. pathlib替代os.path: pathlib模块提供了面向对象的路径操作,代码更简洁、可读性更强。
  2. 字典收集数据: data_parts = {}用于存储每个文件处理后的Series对象。
  3. enumerate: 代替手动维护count变量,使代码更简洁。
  4. pd.read_csv参数优化:
    • header=None: 明确指定CSV文件没有表头。
    • memory_map=True: 对于大文件,这可以提高读取效率,因为它将文件映射到内存,而不是一次性加载所有数据。
    • low_memory=False: 禁用Pandas的低内存解析器,这在处理混合数据类型的列时可以避免警告,并可能提高解析大型文件的速度。
  5. 数据转换:
    • file_data.set_index('Case').squeeze(): 将Case列设置为索引,然后使用squeeze()将单列DataFrame转换为Series。这样做是为了在最终pd.concat时,能够利用Pandas的特性,通过字典键自动创建folder_file_id的索引层级,并方便后续的unstack操作。
  6. 单次pd.concat: 循环结束后,通过pd.concat(data_parts, names=['folder_file_id'])一次性合并所有Series。
  7. unstack和reset_index: unstack('Case')将原始的Case索引(现在是Series的索引)转换为列,reset_index()将生成的MultiIndex扁平化,得到最终所需的DataFrame结构。

示例输入数据:

码哩写作
码哩写作

最懂作者的AI辅助创作工具

码哩写作 91
查看详情 码哩写作
>>> df
   File ID    File Name
0  folderA  file001.txt
1  folderB  file002.txt

>>> cat root/folderA/file001.txt
0   1234
1   5678
2   9012
3   3456
4   7890

>>> cat root/folderB/file002.txt
0   4567
1   8901
2   2345
3   6789
登录后复制

示例输出结果:

>>> merged_data
Case       folder_file_id       0       1       2       3       4
0     folderA_file001.txt  1234.0  5678.0  9012.0  3456.0  7890.0
1     folderB_file002.txt  4567.0  8901.0  2345.0  6789.0     NaN
登录后复制

3. 优化策略二:利用多线程并行处理文件读取

对于I/O密集型任务(如读取大量文件),即使避免了循环内concat,文件读取本身也可能成为瓶颈。在这种情况下,可以考虑使用多线程来并行化文件读取过程。Python的concurrent.futures模块提供了一个ThreadPoolExecutor,非常适合处理这类场景。

from concurrent.futures import ThreadPoolExecutor
import pathlib
import pandas as pd

root_path = pathlib.Path('root')

# 定义一个函数,用于处理单个文件的读取和转换逻辑
def read_csv_and_process(args):
    count, row_dict = args # 展开传入的参数
    folder_name = row_dict['File ID'].strip()
    file_name = row_dict['File Name'].strip()
    file_path = root_path / folder_name / file_name
    folder_file_id = f'{folder_name}_{file_name}'

    file_data = pd.read_csv(file_path, header=None, sep='\t',
                            names=['Case', folder_file_id],
                            memory_map=True, low_memory=False)
    print(f"Processing {count}: {folder_file_id}")
    return folder_file_id, file_data.set_index('Case').squeeze()

# 使用 ThreadPoolExecutor
# max_workers 参数控制并行执行的线程数量,通常根据CPU核心数或I/O特性调整
with ThreadPoolExecutor(max_workers=4) as executor: # 可以根据系统资源调整 max_workers
    # 将 df 转换为字典列表,以便每个字典作为参数传递给 read_csv_and_process
    # enumerate 用于在并行处理时也能追踪进度
    batch_args = enumerate(df[['File ID', 'File Name']].to_dict('records'), 1)

    # executor.map 会将 batch_args 中的每个元素应用到 read_csv_and_process 函数
    # 并以提交的顺序返回结果
    processed_results_iterator = executor.map(read_csv_and_process, batch_args)

    # 将迭代器转换为字典,以便进行最终的 concat
    data_parts_threaded = dict(processed_results_iterator)

# 最终合并步骤与单线程版本相同
merged_data_threaded = (pd.concat(data_parts_threaded, names=['folder_file_id'])
                          .unstack('Case').reset_index())
登录后复制

多线程代码改进点说明:

  1. read_csv_and_process函数: 将单个文件的处理逻辑封装成一个独立的函数,方便多线程调用。这个函数接收一个元组args,其中包含计数和行数据字典。
  2. ThreadPoolExecutor: 创建一个线程池。max_workers参数决定了同时运行的线程数量。对于I/O密集型任务,可以适当增加max_workers,但过多的线程也可能导致上下文切换开销增加。
  3. df.to_dict('records'): 将DataFrame的每一行转换为一个字典,这样可以方便地将行数据作为参数传递给并行函数。
  4. executor.map: 这是ThreadPoolExecutor的核心方法之一,它将一个函数和一个可迭代对象作为输入,并行地对可迭代对象中的每个元素应用该函数,并返回一个迭代器,按提交顺序提供结果。
  5. 结果收集: dict(processed_results_iterator)将并行处理的结果(键值对元组)收集成一个字典。

注意事项:

  • 多线程主要适用于I/O密集型任务(如文件读写、网络请求)。对于CPU密集型任务,由于Python的全局解释器锁(GIL),多线程并不能真正实现并行计算,此时应考虑使用多进程(ProcessPoolExecutor)。
  • 在实际应用中,max_workers的设置需要根据系统资源和任务特性进行调优。

4. 总结与最佳实践

处理大量文件合并的场景时,避免在循环中频繁调用pd.concat是提升性能的关键。

  • 核心原则:将数据收集起来,然后进行一次性的大规模合并操作。这减少了内存重新分配和数据复制的开销。
  • 优化方案一(收集数据后单次concat):适用于大多数情况,是处理大量文件合并的首选优化。它简单有效,并且对内存管理更友好。
  • 优化方案二(多线程并行处理):当文件读取本身成为瓶颈时,可以进一步引入多线程来加速文件I/O。这在文件分散在不同物理磁盘或网络存储上时尤其有效。

在实际开发中,应首先采用第一种优化方案,如果性能仍不满足要求,再考虑引入多线程或多进程等并行化技术。同时,合理利用pd.read_csv等函数的参数(如memory_map、low_memory、chunksize等)也能进一步提升数据加载效率。

以上就是高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱的详细内容,更多请关注php中文网其它相关文章!

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号