
在数据分析和处理的场景中,我们经常会遇到需要处理大量结构相同但存储在不同文件中的数据。例如,一系列按产品或日期划分的csv文件,如 data_product_1.csv、data_product_2.csv 等。通常,我们希望将这些文件的数据合并到一个统一的dataframe中,并且在此过程中,能够为每条记录添加一个标识其来源文件(或从中提取的产品代码)的额外列。虽然polars提供了方便的通配符加载功能 pl.read_csv("data_*.csv") 来合并文件,但它不直接支持在加载时自动添加文件名作为列。本文将介绍如何利用polars的惰性api来实现这一高级功能。
Polars的惰性(Lazy)API是处理大规模数据的强大工具,它允许我们构建计算图,延迟实际的数据加载和计算,直到 collect() 方法被调用。这使得Polars能够进行查询优化,并在可能的情况下并行处理任务,从而显著提高性能。
为了实现批量加载CSV文件并添加文件名作为新列,我们将结合使用 polars.scan_csv、Python的 pathlib 模块和 polars.concat。
文件准备: 首先,确保您的工作目录下有如下结构的CSV文件。例如,创建三个文件:data_product_1.csv, data_product_2.csv, data_product_3.csv。
data_product_1.csv:
data,value 2000-01-01,1 2000-01-02,2
data_product_2.csv:
data,value 2000-01-01,3 2000-01-02,4
data_product_3.csv:
data,value 2000-01-01,4 2000-01-02,5
导入必要的库: 我们需要 polars 进行数据操作,以及 pathlib 来方便地查找文件。
import polars as pl from pathlib import Path
构建惰性DataFrame列表: 遍历所有符合模式的CSV文件,对每个文件执行以下操作:
# 获取当前目录下所有匹配 "data_*.csv" 模式的文件路径
csv_files = Path().glob("data_*.csv")
# 为每个文件创建一个LazyFrame,并添加文件名作为新列
lazy_frames = [
pl.scan_csv(f).with_columns(product_code=pl.lit(f.name))
for f in csv_files
]合并惰性DataFrame并执行计算: 使用 pl.concat() 将所有惰性DataFrame合并成一个单一的惰性DataFrame。默认情况下,pl.concat 会并行处理这些惰性DataFrame,从而提高效率。最后,调用 .collect() 来触发实际的数据加载和计算,将惰性DataFrame转换为一个急切(Eager)的DataFrame。
# 合并所有LazyFrame,并在collect()时并行读取和处理 df = pl.concat(lazy_frames).collect() # 打印结果 print(df)
import polars as pl
from pathlib import Path
import os
# --- 准备测试文件 (如果您的环境没有这些文件,请运行此段代码) ---
# 创建一个临时目录来存放CSV文件
temp_dir = "temp_csv_data"
os.makedirs(temp_dir, exist_ok=True)
# 写入测试CSV文件
file_contents = {
"data_product_1.csv": "data,value\n2000-01-01,1\n2000-01-02,2",
"data_product_2.csv": "data,value\n2000-01-01,3\n2000-01-02,4",
"data_product_3.csv": "data,value\n2000-01-01,4\n2000-01-02,5"
}
for filename, content in file_contents.items():
with open(Path(temp_dir) / filename, "w") as f:
f.write(content)
# --- 测试文件准备结束 ---
# 切换到临时目录以查找文件
original_cwd = Path.cwd()
os.chdir(temp_dir)
try:
# 获取当前目录下所有匹配 "data_*.csv" 模式的文件路径
csv_files = Path().glob("data_*.csv")
# 为每个文件创建一个LazyFrame,并添加文件名作为新列
lazy_frames = [
pl.scan_csv(f).with_columns(product_code=pl.lit(f.name))
for f in csv_files
]
# 合并所有LazyFrame,并在collect()时并行读取和处理
# 如果没有文件,lazy_frames可能为空,需要处理
if lazy_frames:
df = pl.concat(lazy_frames).collect()
# 打印结果
print(df)
else:
print("未找到匹配的CSV文件。")
finally:
# 切换回原始工作目录并清理临时文件
os.chdir(original_cwd)
import shutil
shutil.rmtree(temp_dir)执行上述代码后,您将得到一个合并后的DataFrame,其中包含原始数据以及一个名为 product_code 的新列,该列存储了每条记录对应的源文件名:
shape: (6, 3) ┌────────────┬───────┬────────────────────┐ │ data ┆ value ┆ product_code │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ str │ ╞════════════╪═══════╪════════════════════╡ │ 2000-01-01 ┆ 1 ┆ data_product_1.csv │ │ 2000-01-02 ┆ 2 ┆ data_product_1.csv │ │ 2000-01-01 ┆ 3 ┆ data_product_2.csv │ │ 2000-01-02 ┆ 4 ┆ data_product_2.csv │ │ 2000-01-01 ┆ 4 ┆ data_product_3.csv │ │ 2000-01-02 ┆ 5 ┆ data_product_3.csv │ └────────────┴───────┴────────────────────┘
如果您需要从 product_code 列中提取更精简的产品名称(例如,将 data_product_1.csv 转换为 product_1),可以在 with_columns 之后或 collect() 之后进一步使用字符串操作,例如 df.with_columns(pl.col("product_code").str.extract(r"product_(\d+).csv").alias("product_id"))。
惰性评估 (LazyFrame): pl.scan_csv() 返回的是一个 LazyFrame 对象,而不是立即加载数据的 DataFrame。这意味着Polars不会立即读取文件内容或执行任何计算。它只是构建一个表示未来计算步骤的计划。这种惰性特性对于处理大型数据集至关重要,因为它允许Polars优化整个查询,例如只读取所需列,或在内存不足时进行批处理。
并行处理: 当使用 pl.concat() 合并多个 LazyFrame 并最终调用 collect() 时,Polars会尝试并行地读取和处理这些文件。这充分利用了多核CPU的优势,显著加快了数据加载和初始转换的速度,特别是在文件数量众多或单个文件较大时。
pathlib 模块: pathlib 提供了面向对象的路径操作方式,使得文件路径的查找、遍历和处理变得更加简洁和Pythonic。Path().glob("data_*.csv") 能够方便地获取所有符合通配符模式的文件路径对象。
通过结合使用 polars.scan_csv、pathlib 和 polars.concat,我们能够优雅且高效地解决批量加载多个CSV文件并动态添加文件名信息的需求。这种方法充分利用了Polars的惰性评估和并行处理能力,不仅代码简洁,而且在处理大规模数据时表现出卓越的性能。掌握这一模式,将极大地提升您在Polars中进行数据预处理和特征工程的效率。
以上就是Polars教程:高效加载多文件并动态添加文件名信息列的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号