
本文旨在解决Airflow中S3Hook的`download_file`函数在下载S3文件时,目标路径意外生成`airflow_tmp_`临时子目录导致`FileNotFoundError`的问题。我们将深入探讨`download_file`的默认行为,并提供使用`preserve_file_name`和`use_autogenerated_subdir`参数来精确控制文件下载路径和命名的方法,确保文件按预期存储。
在Apache Airflow中,S3Hook提供了一个便捷的方式与Amazon S3服务进行交互。其中,download_file函数用于将S3存储桶中的文件下载到本地文件系统。然而,开发者在使用此函数时常会遇到一个非预期的行为:即使指定了明确的本地目标路径,下载的文件有时会被放置在一个自动生成的airflow_tmp_开头的临时子目录中,这可能导致后续文件操作失败并抛出FileNotFoundError。
这种默认行为的出现,是由于S3Hook在设计上为了某些内部处理或确保原子性,可能会在目标路径下创建临时目录来存放下载的文件。当用户期望文件直接位于指定路径时,这种行为就会造成困扰。
考虑以下一个典型的使用场景,尝试从S3下载文件并读取其内容:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from airflow.models.dag import DAG
from datetime import datetime
import os
def s3_extract(key: str, bucket_name: str, local_path: str) -> str:
"""
从S3下载文件并读取其内容。
"""
source_s3_key = key
source_s3_bucket = bucket_name
dest_file_path = local_path # 期望的本地目标目录
# 确保本地目标目录存在
if not os.path.exists(dest_file_path):
os.makedirs(dest_file_path)
print(f"Created directory: {dest_file_path}")
source_s3 = S3Hook(aws_conn_id="aws_conn_str") # 假设已配置名为"aws_conn_str"的AWS连接
# 尝试下载文件,期望其位于 dest_file_path/filename.txt
# 注意:这里直接拼接了文件名,但 S3Hook 可能会在 dest_file_path 下创建子目录
target_local_file = os.path.join(dest_file_path, os.path.basename(key))
# 原始问题中的调用方式:
# source_s3.download_file(source_s3_key, source_s3_bucket, f"{dest_file_path}/filename.txt")
# 这种方式可能导致文件被下载到 f"{dest_file_path}/filename.txt/airflow_tmp_..."
# 更准确的原始问题模拟,直接指定目标文件路径,但S3Hook可能在其父目录创建临时文件夹
source_s3.download_file(
key=source_s3_key,
bucket_name=source_s3_bucket,
local_path=target_local_file # 期望的完整本地文件路径
)
# 尝试打开文件
try:
with open(target_local_file, "r") as file:
text = file.read()
print(f"File content: {text[:100]}...") # 打印前100个字符
return text
except FileNotFoundError as e:
print(f"Error: File not found at {target_local_file}. Details: {e}")
# 在这里,如果S3Hook创建了临时子目录,这个错误就会发生
raise # 重新抛出异常以便Airflow捕获
with DAG(
dag_id='s3_download_tutorial_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['s3', 'tutorial'],
) as dag:
download_job = PythonOperator(
task_id="s3_download_task",
python_callable=s3_extract,
op_kwargs={
'key': 'airflow/docs/filename.txt',
'bucket_name': 's3-dev-data-001', # 替换为你的S3桶名
'local_path': '/tmp/airflow_data' # 替换为你的本地路径,确保Airflow worker有写入权限
}
)当上述代码执行时,如果S3Hook的默认行为触发,可能会观察到类似以下FileNotFoundError:
FileNotFoundError: [Errno 2] no such file or directory: '/tmp/airflow_data/filename.txt/airflow_tmp_90_6ogw5'
这表明S3Hook并没有将文件直接下载到/tmp/airflow_data/filename.txt,而是在其下创建了一个名为airflow_tmp_90_6ogw5的子目录,并将文件放置其中。
为了解决这个问题,S3Hook.download_file函数提供了两个关键参数,允许我们精确控制文件的下载位置和命名:
通过将这两个参数设置为 False 和 True,我们可以强制S3Hook将文件直接下载到我们指定的完整本地文件路径。
以下是修改后的 s3_extract 函数:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from airflow.models.dag import DAG
from datetime import datetime
import os
def s3_extract_corrected(key: str, bucket_name: str, local_path: str) -> str:
"""
从S3下载文件并读取其内容,使用参数控制文件下载路径。
"""
source_s3_key = key
source_s3_bucket = bucket_name
dest_dir = local_path # 期望的本地目标目录
# 确保本地目标目录存在
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
print(f"Created directory: {dest_dir}")
source_s3 = S3Hook(aws_conn_id="aws_conn_str")
# 构建完整的本地文件路径
# os.path.basename(key) 从S3 key中提取文件名
target_local_file_path = os.path.join(dest_dir, os.path.basename(key))
print(f"Attempting to download S3://{source_s3_bucket}/{source_s3_key} to {target_local_file_path}")
# 使用 preserve_file_name=True 和 use_autogenerated_subdir=False
# 将文件直接下载到 target_local_file_path
source_s3.download_file(
key=source_s3_key,
bucket_name=source_s3_bucket,
local_path=target_local_file_path,
preserve_file_name=True, # 确保文件名与S3对象名一致
use_autogenerated_subdir=False # 禁用自动生成临时子目录
)
# 尝试打开文件
try:
with open(target_local_file_path, "r") as file:
text = file.read()
print(f"Successfully downloaded and read file from {target_local_file_path}. Content snippet: {text[:100]}...")
return text
except FileNotFoundError as e:
print(f"Error: File not found at {target_local_file_path}. Details: {e}")
raise
except Exception as e:
print(f"An unexpected error occurred while reading the file: {e}")
raise
with DAG(
dag_id='s3_download_tutorial_dag_corrected',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['s3', 'tutorial', 'fix'],
) as dag_corrected:
download_job_corrected = PythonOperator(
task_id="s3_download_task_corrected",
python_callable=s3_extract_corrected,
op_kwargs={
'key': 'airflow/docs/filename.txt',
'bucket_name': 's3-dev-data-001', # 替换为你的S3桶名
'local_path': '/tmp/airflow_data' # 替换为你的本地路径,确保Airflow worker有写入权限
}
)S3Hook.download_file函数在Airflow中是一个强大的工具,但其默认的临时文件处理行为可能会导致意外的FileNotFoundError。通过理解并正确使用preserve_file_name=True和use_autogenerated_subdir=False这两个关键参数,开发者可以完全控制文件的下载路径和命名,确保数据管道的稳定性和可预测性。在构建Airflow任务时,始终建议查阅相关Hook的官方文档,以充分了解其参数和行为,从而避免常见陷阱。
以上就是Airflow S3Hook download_file 路径管理与临时文件控制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号