
本文旨在指导开发者如何在streamlit应用中集成并运行kedro数据管道,重点解决如何动态创建并传递自定义`datacatalog`以处理streamlit加载的数据。文章将阐明常见的错误尝试及其原因,并提供一种健壮的方法,通过`kedrosession.run()`的`data_catalog`参数正确地将运行时数据注入kedro管道,从而实现数据处理的无缝衔接。
在构建交互式数据应用时,将强大的数据管道框架(如Kedro)与灵活的Web应用框架(如Streamlit)结合是一种常见的需求。特别是当数据源是动态的,例如用户通过Streamlit界面上传文件时,我们需要一种机制来将这些运行时加载的数据作为输入,传递给Kedro管道进行处理。本文将详细介绍如何实现这一目标,并纠正在此过程中可能遇到的常见误区。
Kedro的核心概念之一是DataCatalog,它定义了数据加载和保存的方式。通常,DataCatalog在conf/base/catalog.yml中静态定义。然而,在Streamlit应用中,数据通常在运行时由用户上传,这意味着我们需要一个动态的DataCatalog来封装这些内存中的DataFrame。
目标是将Streamlit中加载的DataFrame包装成MemoryDataSet,然后构建一个临时的DataCatalog,并将其传递给Kedro管道执行。
在尝试将自定义DataCatalog传递给Kedro管道时,开发者可能会遇到一些AttributeError。理解这些错误的原因对于正确实现集成至关重要。
# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
import pandas as pd
import streamlit as st
# 假设 df1, df2, ... 是在Streamlit中加载的DataFrame
df1 = pd.DataFrame({'col1': [1, 2]})
df2 = pd.DataFrame({'col2': [3, 4]})
if st.button('Processar Dados de Entrada'):
with KedroSession.create(project_path="./my_kedro_project") as session:
context = session.load_context()
# 尝试直接设置context.catalog,这将导致AttributeError
# context.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'
# 尝试直接设置session.catalog,同样会导致AttributeError
# session.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'
# ...后续管道运行代码原因分析:AttributeError: can't set attribute 'catalog'
KedroSession和KedroContext的catalog属性在Kedro内部被设计为只读。这意味着您不能在会话或上下文创建之后,通过直接赋值的方式来修改它们引用的DataCatalog对象。DataCatalog在KedroSession初始化时被加载并冻结,以确保管道执行的一致性和可预测性。尝试直接修改它会违反这一设计原则,从而引发AttributeError。
# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.runner import SequentialRunner
import streamlit as st
# ... (数据加载和catalog创建) ...
if st.button('Processar Dados de Entrada'):
with KedroSession.create(project_path="./my_kedro_project") as session:
context = session.load_context()
runner = SequentialRunner()
# 尝试通过context.pipeline_registry获取管道,这将导致AttributeError
# runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=custom_catalog) # AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'原因分析:AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
KedroContext对象并不直接拥有pipeline_registry属性。管道注册是KedroSession负责管理的一部分。当您通过KedroSession.run()方法执行管道时,会话会自动处理管道的查找和注册。直接从KedroContext中访问pipeline_registry是不符合Kedro设计模式的。
Kedro提供了一种简洁且推荐的方式来在运行时注入自定义DataCatalog,即使用KedroSession.run()方法的data_catalog(或旧版本中的catalog)参数。
首先,在Streamlit应用中,您需要使用文件上传器或其他方式加载数据,并将其转换为Pandas DataFrame。然后,将这些DataFrame包装成Kedro的MemoryDataSet对象。MemoryDataSet是Kedro提供的一种数据集类型,用于处理内存中的数据,非常适合这种动态场景。
import streamlit as st
import pandas as pd
from kedro.io import DataCatalog, MemoryDataSet
from kedro.framework.session import KedroSession
from pathlib import Path
# 假设你的Kedro项目路径
project_path = Path(__file__).parent / "my_kedro_project" # 根据实际项目结构调整
st.title("Kedro与Streamlit数据处理应用")
uploaded_file1 = st.file_uploader("上传 Reagentes CSV", type=["csv"])
uploaded_file2 = st.file_uploader("上传 Balanço de Massas CSV", type=["csv"])
# ... 可以根据需要添加更多文件上传器
df1, df2, df3, df4, df5, df6 = None, None, None, None, None, None
if uploaded_file1:
df1 = pd.read_csv(uploaded_file1)
st.write("Reagentes 数据加载成功:")
st.dataframe(df1.head())
if uploaded_file2:
df2 = pd.read_csv(uploaded_file2)
st.write("Balanço de Massas 数据加载成功:")
st.dataframe(df2.head())
# 假设还有其他文件加载,这里简化
# df3 = pd.DataFrame(...)
# df4 = pd.DataFrame(...)
# df5 = pd.DataFrame(...)
# df6 = pd.DataFrame(...)
当所有必需的DataFrame都加载完毕后,您可以创建一个新的DataCatalog实例,并将这些MemoryDataSet对象作为键值对添加到其中。键名应与您的Kedro管道中期望的数据集名称相匹配。
# ... (承接上一步的代码) ...
if st.button('Processar Dados de Entrada'):
if df1 is not None and df2 is not None: # 确保所有必要数据都已加载
# 创建自定义DataCatalog
custom_catalog = DataCatalog({
"reagentes_raw": MemoryDataSet(df1),
"balanco_de_massas_raw": MemoryDataSet(df2),
# 根据需要添加更多数据集
# "laboratorio_raw": MemoryDataSet(df3),
# "laboratorio_raiox_raw": MemoryDataSet(df4),
# "carta_controle_pims_raw": MemoryDataSet(df5),
# "blend_raw": MemoryDataSet(df6)
})
st.info("正在执行Kedro管道...")
try:
# 步骤三:通过KedroSession.run()传递自定义DataCatalog
with KedroSession.create(project_path=project_path) as session:
session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")
st.success('数据处理成功!')
# 步骤四:从自定义DataCatalog中加载处理后的结果
# 假设管道输出一个名为 "merged_raw_data_process" 的数据集
if "merged_raw_data_process" in custom_catalog.list():
merged_data = custom_catalog.load("merged_raw_data_process")
st.header('结果数据预览')
st.dataframe(merged_data.head())
# 假设结果数据中有一个时间戳列
if 'timestamp_column' in merged_data.columns: # 请替换为实际的时间戳列名
last_update = merged_data['timestamp_column'].max()
st.write(f"最新数据时间: {last_update.strftime('%d/%m/%Y %H:%M')}")
else:
st.warning("管道未生成预期的 'merged_raw_data_process' 数据集。")
except Exception as e:
st.error(f"Kedro管道执行失败: {e}")
else:
st.warning("请上传所有必要的数据文件。")
这是解决问题的关键步骤。KedroSession.run()方法接受一个data_catalog(或旧版本中的catalog)参数,允许您传入一个临时的DataCatalog实例。这个传入的DataCatalog会与项目默认的DataCatalog合并,或者在某些情况下完全覆盖默认的同名数据集定义,从而将您的内存数据注入到管道执行中。
# ... (代码片段已包含在步骤二中) ...
with KedroSession.create(project_path=project_path) as session:
session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")请注意,pipeline_name参数用于指定要运行的特定管道。如果您的Kedro项目只有一个默认管道,可以省略此参数。
管道执行完成后,如果您的管道配置为将结果保存到MemoryDataSet中(例如,通过在catalog.yml中将输出数据集定义为MemoryDataSet,或者在运行时通过custom_catalog覆盖),您可以直接从传入的custom_catalog中加载这些结果。
# ... (代码片段已包含在步骤二中) ...
merged_data = custom_catalog.load("merged_raw_data_process")通过在KedroSession.run()方法中利用data_catalog参数,我们可以优雅地将Streamlit中加载的动态数据注入到Kedro管道中进行处理。这种方法避免了直接修改Kedro内部只读属性的错误,提供了一种符合Kedro设计哲学且易于维护的集成方案。遵循本文介绍的步骤和最佳实践,您将能够构建出功能强大、交互性强的数据处理应用。
以上就是Kedro与Streamlit集成:动态数据目录下的管道运行实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号