
本文旨在解决在pandas中循环合并大量csv文件时遇到的性能瓶颈。通过分析循环中使用`pd.concat`的低效性,文章提出两种优化策略:一是将所有数据收集到字典中,最后进行一次性`pd.concat`;二是利用`concurrent.futures.threadpoolexecutor`实现文件读取的并行化。这些方法显著提升了处理效率,避免了随着文件数量增加而导致的性能急剧下降。
在数据处理过程中,我们经常需要合并来自多个文件的信息。当面对大量(例如上千个)中等大小(例如每个15MB,10,000行)的CSV文件时,一种直观的做法是使用循环迭代读取每个文件,进行必要的转换,然后通过pd.concat将其追加到一个主DataFrame中。然而,这种方法存在严重的性能问题。
考虑以下原始代码示例:
import pandas as pd
import os
# 假设 df 是一个包含文件路径信息的DataFrame
# root_path 是根目录
# df 示例:
# File ID File Name
# 0 folderA file001.txt
# 1 folderB file002.txt
merged_data = pd.DataFrame()
count = 0
for index, row in df.iterrows():
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = os.path.join(root_path, folder_name, file_name)
# 读取、转置并插入新列
file_data = pd.read_csv(file_path, names=['Case', f'{folder_name}_{file_name}'], sep='\t')
file_data_transposed = file_data.set_index('Case').T.reset_index(drop=True)
file_data_transposed.insert(loc=0, column='folder_file_id', value=str(folder_name+'_'+file_name))
# 在循环中进行拼接
merged_data = pd.concat([merged_data, file_data_transposed], axis=0, ignore_index=True)
count = count + 1
print(count)上述代码的性能问题在于,每次循环调用pd.concat时,Pandas都需要创建一个新的DataFrame来容纳旧数据和新数据。这意味着大量的内存重新分配和数据复制操作,随着merged_data的增大,这些操作的开销会呈指数级增长,导致处理速度越来越慢。对于上千个文件,这种方法是不可持续的。
解决循环中pd.concat低效问题的核心思想是:避免在每次迭代中都进行拼接操作。取而代之的是,将每个文件处理后的数据收集到一个数据结构(如列表或字典)中,然后在循环结束后,一次性地调用pd.concat来合并所有数据。
以下是优化后的代码示例:
import pathlib
import pandas as pd
# 假设 df 是一个包含文件路径信息的DataFrame
# root_path 是根目录,使用 pathlib 替换 os.path
root_path = pathlib.Path('root')
data_parts = {} # 使用字典收集每个文件的处理结果
# 使用 enumerate 简化计数器,并迭代 df
for count, (_, row) in enumerate(df.iterrows(), 1):
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = root_path / folder_name / file_name # pathlib 的路径拼接
folder_file_id = f'{folder_name}_{file_name}'
# 读取CSV文件,并进行初步处理
# header=None 因为文件没有表头
# memory_map=True 可以提高大文件读取效率,low_memory=False 避免混合类型警告
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
# 将 'Case' 列设置为索引,并使用 squeeze() 将 DataFrame 转换为 Series
# 这样可以方便后续的 unstack 操作
data_parts[folder_file_id] = file_data.set_index('Case').squeeze()
print(count)
# 循环结束后,一次性合并所有数据
# pd.concat 传入字典,字典的键将作为新的层级索引 (names=['folder_file_id'])
# unstack('Case') 将 'Case' 索引转换为列
# reset_index() 将多级索引扁平化,并重置索引
merged_data = (pd.concat(data_parts, names=['folder_file_id'])
.unstack('Case').reset_index())代码改进点说明:
示例输入数据:
>>> df File ID File Name 0 folderA file001.txt 1 folderB file002.txt >>> cat root/folderA/file001.txt 0 1234 1 5678 2 9012 3 3456 4 7890 >>> cat root/folderB/file002.txt 0 4567 1 8901 2 2345 3 6789
示例输出结果:
>>> merged_data Case folder_file_id 0 1 2 3 4 0 folderA_file001.txt 1234.0 5678.0 9012.0 3456.0 7890.0 1 folderB_file002.txt 4567.0 8901.0 2345.0 6789.0 NaN
对于I/O密集型任务(如读取大量文件),即使避免了循环内concat,文件读取本身也可能成为瓶颈。在这种情况下,可以考虑使用多线程来并行化文件读取过程。Python的concurrent.futures模块提供了一个ThreadPoolExecutor,非常适合处理这类场景。
from concurrent.futures import ThreadPoolExecutor
import pathlib
import pandas as pd
root_path = pathlib.Path('root')
# 定义一个函数,用于处理单个文件的读取和转换逻辑
def read_csv_and_process(args):
count, row_dict = args # 展开传入的参数
folder_name = row_dict['File ID'].strip()
file_name = row_dict['File Name'].strip()
file_path = root_path / folder_name / file_name
folder_file_id = f'{folder_name}_{file_name}'
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
print(f"Processing {count}: {folder_file_id}")
return folder_file_id, file_data.set_index('Case').squeeze()
# 使用 ThreadPoolExecutor
# max_workers 参数控制并行执行的线程数量,通常根据CPU核心数或I/O特性调整
with ThreadPoolExecutor(max_workers=4) as executor: # 可以根据系统资源调整 max_workers
# 将 df 转换为字典列表,以便每个字典作为参数传递给 read_csv_and_process
# enumerate 用于在并行处理时也能追踪进度
batch_args = enumerate(df[['File ID', 'File Name']].to_dict('records'), 1)
# executor.map 会将 batch_args 中的每个元素应用到 read_csv_and_process 函数
# 并以提交的顺序返回结果
processed_results_iterator = executor.map(read_csv_and_process, batch_args)
# 将迭代器转换为字典,以便进行最终的 concat
data_parts_threaded = dict(processed_results_iterator)
# 最终合并步骤与单线程版本相同
merged_data_threaded = (pd.concat(data_parts_threaded, names=['folder_file_id'])
.unstack('Case').reset_index())多线程代码改进点说明:
注意事项:
处理大量文件合并的场景时,避免在循环中频繁调用pd.concat是提升性能的关键。
在实际开发中,应首先采用第一种优化方案,如果性能仍不满足要求,再考虑引入多线程或多进程等并行化技术。同时,合理利用pd.read_csv等函数的参数(如memory_map、low_memory、chunksize等)也能进一步提升数据加载效率。
以上就是高效处理Pandas中大量CSV文件合并:避免循环内concat的性能陷阱的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号