
本文深入探讨了在pandas中高效处理和合并大量csv文件的方法。针对循环内部频繁使用`pd.concat`导致的性能瓶颈,文章提出了将数据收集到字典中并在循环结束后进行一次性合并的优化策略。此外,结合`pathlib`进行路径管理和利用多线程实现并发处理,进一步提升了数据处理效率和内存利用率,为大规模数据整合提供了专业的解决方案。
在数据处理工作中,我们经常需要从多个文件中读取数据并将其合并到一个大型Pandas DataFrame中。一个常见的直觉做法是在循环中逐个读取文件,然后使用pd.concat将每个文件的数据追加到主DataFrame。然而,当文件数量庞大(例如上千个)且每个文件的数据量不小(例如15MB,超过10,000行)时,这种做法会导致严重的性能问题。初始的几次循环可能很快完成,但随着主DataFrame的不断增大,每次concat操作所需的时间会呈指数级增长,最终使得整个过程变得异常缓慢,甚至可能耗尽系统内存。
考虑以下场景:我们有一个包含文件路径信息的DataFrame df,需要遍历其中的每一行,读取对应的CSV文件,对其进行转置和格式化,最终合并到一个名为 merged_data 的大型DataFrame中。
import pandas as pd
import os
# 假设 root_path 和 df 已经定义
# root_path = '/path/to/your/root'
# df = pd.DataFrame({'File ID': ['folderA', 'folderB'], 'File Name': ['file001.txt', 'file002.txt']})
merged_data = pd.DataFrame()
count = 0
for index, row in df.iterrows():
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = os.path.join(root_path, folder_name, file_name)
# 读取、转置并格式化文件数据
file_data = pd.read_csv(file_path, names=['Case', f'{folder_name}_{file_name}'], sep='\t')
file_data_transposed = file_data.set_index('Case').T.reset_index(drop=True)
file_data_transposed.insert(loc=0, column='folder_file_id', value=str(folder_name+'_'+file_name))
# 每次循环都进行 concat
merged_data = pd.concat([merged_data, file_data_transposed], axis=0, ignore_index=True)
count = count + 1
print(count)上述代码的性能问题主要源于以下几点:
解决循环内concat性能问题的核心思想是:避免在循环中重复执行昂贵的操作,而是将所有中间结果收集起来,在循环结束后一次性执行合并。
我们将不再每次迭代都将数据追加到merged_data,而是将每个文件处理后的结果(通常是一个Pandas Series或DataFrame片段)存储在一个Python字典中。字典的键可以是文件的唯一标识符,值则是处理后的数据。循环结束后,我们再将这个字典传递给pd.concat,进行一次性高效合并。
import pathlib
import pandas as pd
# 假设 root_path 和 df 已经定义
root_path = pathlib.Path('root') # 使用 pathlib 代替 os.path
df = pd.DataFrame({
'File ID': ['folderA', 'folderB'],
'File Name': ['file001.txt', 'file002.txt']
})
data = {} # 用于收集所有处理后的数据
# 使用 enumerate 而非外部计数器,并直接迭代 df.iterrows()
for count, (_, row) in enumerate(df.iterrows(), 1):
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
# 使用 pathlib 构建文件路径,更简洁安全
file_path = root_path / folder_name / file_name
folder_file_id = f'{folder_name}_{file_name}'
# 读取CSV文件,指定 header=None 因为文件没有表头
# memory_map=True 可以提高大文件读取效率
# low_memory=False 确保正确推断所有列的数据类型
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
# 设置 'Case' 列为索引,并使用 squeeze() 将单列DataFrame转换为Series
data[folder_file_id] = file_data.set_index('Case').squeeze()
print(count)
# 循环结束后,一次性进行 concat
merged_data = (pd.concat(data, names=['folder_file_id'])
.unstack('Case').reset_index())示例输入数据:
# df File ID File Name 0 folderA file001.txt 1 folderB file002.txt # root/folderA/file001.txt 0 1234 1 5678 2 9012 3 3456 4 7890 # root/folderB/file002.txt 0 4567 1 8901 2 2345 3 6789
示例输出:
>>> merged_data Case folder_file_id 0 1 2 3 4 0 folderA_file001.txt 1234.0 5678.0 9012.0 3456.0 7890.0 1 folderB_file002.txt 4567.0 8901.0 2345.0 6789.0 NaN
对于I/O密集型任务(如读取大量文件),即使优化了concat,文件读取本身仍可能成为瓶颈。在这种情况下,可以考虑使用多线程(或多进程)来并行化文件读取过程。Python的concurrent.futures模块提供了方便的接口来实现这一点。
多线程在Python中受GIL(全局解释器锁)的限制,对于CPU密集型任务效果不佳。但对于I/O密集型任务(如文件读写、网络请求),当一个线程等待I/O操作完成时,GIL会被释放,允许其他线程执行Python代码。因此,多线程可以显著提高I/O密集型任务的整体吞吐量。
from concurrent.futures import ThreadPoolExecutor
import pathlib
import pandas as pd
root_path = pathlib.Path('root')
df = pd.DataFrame({
'File ID': ['folderA', 'folderB'],
'File Name': ['file001.txt', 'file002.txt']
})
def read_and_process_csv(args):
"""
一个辅助函数,用于在单独的线程中读取和处理单个CSV文件。
"""
count, row_dict = args # 解包参数,row_dict 是 df.to_dict('records') 的一行
folder_name = row_dict['File ID'].strip()
file_name = row_dict['File Name'].strip()
file_path = root_path / folder_name / file_name
folder_file_id = f'{folder_name}_{file_name}'
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
print(f"Processing {count}: {folder_file_id}")
return folder_file_id, file_data.set_index('Case').squeeze()
# 创建一个线程池,max_workers 根据CPU核心数和I/O负载调整
with ThreadPoolExecutor(max_workers=4) as executor: # 示例使用4个工作线程
# 将 DataFrame 转换为字典列表,以便传递给线程池
# enumerate 用于添加计数器
batch_args = enumerate(df[['File ID', 'File Name']].to_dict('records'), 1)
# 使用 executor.map 并行执行 read_and_process_csv 函数
# data 将是一个迭代器,按提交顺序返回结果
data_iterator = executor.map(read_and_process_csv, batch_args)
# 将迭代器转换为字典,以便 pd.concat 处理
data_dict = dict(data_iterator)
# 循环结束后,一次性进行 concat
merged_data = (pd.concat(data_dict, names=['folder_file_id'])
.unstack('Case').reset_index())在Pandas中高效处理和合并大量文件是数据工程中的常见挑战。本文提供了两种关键的优化策略:
通过采纳这些最佳实践,您可以有效地处理数千甚至数万个文件,将原本耗时数小时甚至数天的任务缩短到可接受的时间范围内,极大地提升数据处理效率。在实际应用中,根据具体的数据量、文件大小和系统资源,可以灵活选择并组合这些优化方法。
以上就是Pandas批量文件处理性能优化:避免循环内concat与并发实践的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号