
本文旨在解决使用pandas和多进程读取数千个大型csv文件时遇到的内存溢出问题。我们将探讨两种核心策略:一是利用xgboost的外部内存dmatrix功能,避免一次性加载全部数据进行模型训练;二是通过优化pandas的并行读取流程,包括合理选择线程池而非进程池,并避免在循环中频繁拼接dataframe,从而提高效率并减少内存消耗。
在处理海量数据集时,例如将数千个CSV文件合并成一个大型DataFrame以供机器学习模型训练,内存溢出是一个常见且棘手的问题。尤其当数据集规模达到数十GB甚至更大时,即使拥有数十GB内存的计算实例也可能难以应对。传统的做法是使用pandas.read_csv结合Python的multiprocessing模块并行读取文件,然后通过pd.concat将结果合并。然而,这种方法在以下两个方面容易导致内存瓶颈:
如果最终目标是将处理后的数据用于XGBoost模型训练,那么最直接且高效的解决方案是利用XGBoost自身提供的外部内存(External Memory)功能。XGBoost从1.5版本开始支持自定义迭代器,允许用户以分块(chunk)的方式加载数据进行训练,从而避免将整个数据集一次性载入内存。
这种方法的核心是使用XGBoost.DMatrix,并配置其外部内存特性。这意味着您不需要预先将所有CSV文件合并成一个巨大的Pandas DataFrame。相反,您可以创建一个数据迭代器,XGBoost在训练过程中会按需从该迭代器中获取数据块。这对于处理远超机器物理内存限制的数据集尤为关键。
核心优势:
使用示例(概念性):
虽然具体实现需要根据XGBoost的官方文档构建自定义数据迭代器,但其基本思想是提供一个可迭代对象,每次迭代返回一个数据块。
import xgboost as xgb
import pandas as pd
from typing import List
# 假设您有一个自定义的迭代器类,用于分块读取CSV文件
class CSVBatchIterator(xgb.DataIter):
def __init__(self, file_paths: List[str], batch_size: int):
super().__init__()
self.file_paths = file_paths
self.batch_size = batch_size
self.it = iter(self._read_chunks())
self.data = None
self.labels = None
def _read_chunks(self):
# 这是一个简化示例,实际可能需要更复杂的逻辑来处理跨文件分块
for path in self.file_paths:
df = pd.read_csv(path)
# 假设最后一列是标签,其余是特征
features = df.iloc[:, :-1]
labels = df.iloc[:, -1]
# 进一步将单个文件的DataFrame分块
for i in range(0, len(features), self.batch_size):
yield features.iloc[i:i+self.batch_size], labels.iloc[i:i+self.batch_size]
def next(self, input_data):
try:
features, labels = next(self.it)
self.data = features
self.labels = labels
input_data(data=self.data, label=self.labels)
return 1 # Return 1 for successful iteration
except StopIteration:
return 0 # Return 0 for end of iteration
# 示例使用
# file_paths = ["path/to/file1.csv", "path/to/file2.csv", ...]
# batch_iterator = CSVBatchIterator(file_paths, batch_size=100000)
# dtrain = xgb.DMatrix(batch_iterator)
#
# # 训练模型
# # params = {'objective': 'binary:logistic', 'eval_metric': 'logloss'}
# # bst = xgb.train(params, dtrain)注意事项: 实际的外部内存DMatrix实现会比上述示例更复杂,需要仔细遵循XGBoost官方文档中关于自定义数据迭代器的指导。核心思想是避免在Python层面将所有数据加载到内存,而是让XGBoost在底层直接处理数据流。
如果您的场景不直接使用XGBoost的外部内存功能,或者需要先将数据整合为Pandas DataFrame进行其他预处理,那么优化Pandas的并行读取策略是关键。
对于文件I/O密集型操作(如pd.read_csv),concurrent.futures.ThreadPoolExecutor通常是比ProcessPoolExecutor更优的选择。
如前所述,在循环中反复使用pd.concat是导致内存和性能问题的主要原因之一。正确的做法是收集所有子DataFrame到一个列表中,然后在所有读取任务完成后,执行一次性的大规模拼接。
优化后的代码示例:
import pandas as pd
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_EXCEPTION
from typing import List
import logging
logger = logging.getLogger(__name__)
def _read_training_data(training_data_path: str) -> pd.DataFrame:
"""
单个CSV文件读取函数。
"""
df = pd.read_csv(training_data_path)
return df
def read_training_data_optimized(
paths: List[str]
) -> pd.DataFrame:
"""
优化后的并行读取训练数据函数。以上就是优化大规模CSV文件读取:解决Pandas与XGBoost内存问题的策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号